ASTERIXDB-969: Redesigned recovery analysis phase to spill to disk

Change-Id: Ide2b346c2ad498d7595e71bae890362c2143d301
Reviewed-on: https://asterix-gerrit.ics.uci.edu/458
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
index 79d45e4..4bcbada 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
@@ -25,6 +25,11 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+    /**
+     * The number of bytes used to represent {@link DatasetId} value.
+     */
+    public static final int BYTES = Integer.BYTES;
+
     int id;
 
     public DatasetId(int id) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
index 3c80e67..046487a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
@@ -22,6 +22,10 @@
 
 public class JobId implements Serializable {
     private static final long serialVersionUID = 1L;
+    /**
+     * The number of bytes used to represent {@link JobId} value.
+     */
+    public static final int BYTES = Integer.BYTES;
 
     private int id;
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 60e3097..a510e51 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -101,21 +101,19 @@
         checksumGen = new CRC32();
     }
 
-    private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE;
-    private final static int JID_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int DSID_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
+    private final static int TYPE_LEN = Byte.SIZE / Byte.SIZE;
+    public final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
+    public final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
     private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
     private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
     private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
     private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE;
+    private final static int NEWOP_LEN = Byte.SIZE / Byte.SIZE;
     private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
     private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
 
-    private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN;
+    private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JobId.BYTES;
+    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
     private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
     private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
 
@@ -142,11 +140,11 @@
             buffer.putInt(newValueSize);
             writeTuple(buffer, newValue, newValueSize);
         }
-        
+
         if (logType == LogType.FLUSH) {
             buffer.putInt(datasetId);
         }
-        
+
         checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
@@ -174,20 +172,19 @@
     public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
         //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
-        if(buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+        if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             buffer.position(beginOffset);
             return RECORD_STATUS.TRUNCATED;
         }
         logType = buffer.get();
         jobId = buffer.getInt();
-        if(logType != LogType.FLUSH)
-        {
+        if (logType != LogType.FLUSH) {
             if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
                 PKHashValue = -1;
             } else {
                 //attempt to read in the dsid, PK hash and PK length
-                if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){
+                if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
                     buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
@@ -195,7 +192,7 @@
                 PKHashValue = buffer.getInt();
                 PKValueSize = buffer.getInt();
                 //attempt to read in the PK
-                if(buffer.remaining() < PKValueSize){
+                if (buffer.remaining() < PKValueSize) {
                     buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
@@ -206,7 +203,7 @@
             }
             if (logType == LogType.UPDATE) {
                 //attempt to read in the previous LSN, log size, new value size, and new record type
-                if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){
+                if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
                     buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
@@ -216,7 +213,7 @@
                 fieldCnt = buffer.getInt();
                 newOp = buffer.get();
                 newValueSize = buffer.getInt();
-                if(buffer.remaining() < newValueSize){
+                if (buffer.remaining() < newValueSize) {
                     buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
@@ -224,10 +221,9 @@
             } else {
                 computeAndSetLogSize();
             }
-        }
-        else{
+        } else {
             computeAndSetLogSize();
-            if(buffer.remaining() < DSID_LEN){
+            if (buffer.remaining() < DatasetId.BYTES) {
                 buffer.position(beginOffset);
                 return RECORD_STATUS.TRUNCATED;
             }
@@ -235,7 +231,7 @@
             resourceId = 0l;
         }
         //atempt to read checksum
-        if(buffer.remaining() < CHKSUM_LEN){
+        if (buffer.remaining() < CHKSUM_LEN) {
             buffer.position(beginOffset);
             return RECORD_STATUS.TRUNCATED;
         }
@@ -274,7 +270,7 @@
         this.PKHashValue = -1;
         computeAndSetLogSize();
     }
-    
+
     public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
         this.logType = LogType.FLUSH;
         this.jobId = -1;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 82ad32d..fec04e4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -24,9 +24,15 @@
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -45,17 +51,21 @@
 import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -83,6 +93,11 @@
     private final LogManager logMgr;
     private final int checkpointHistory;
     private final long SHARP_CHECKPOINT_LSN = -1;
+    private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+    private static final long MEGABYTE = 1024L * 1024L;
+    private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
+    private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB;
+
     /**
      * A file at a known location that contains the LSN of the last log record
      * traversed doing a successful checkpoint.
@@ -105,8 +120,7 @@
      * not supported, yet.
      */
     public SystemState getSystemState() throws ACIDException {
-
-        //#. read checkpoint file
+        //read checkpoint file
         CheckpointObject checkpointObject = null;
         try {
             checkpointObject = readCheckpoint();
@@ -143,6 +157,8 @@
     }
 
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
 
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
@@ -152,18 +168,13 @@
         int jobId = -1;
 
         state = SystemState.RECOVERING;
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] starting recovery ...");
-        }
+        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
 
         Set<Integer> winnerJobSet = new HashSet<Integer>();
-        Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
-        //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
-        Set<TxnId> winnerEntitySet = null;
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-        TxnId winnerEntity = null;
+        jobId2WinnerEntitiesMap = new HashMap<>();
 
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        JobEntityCommits jobEntityWinners = null;
         //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         CheckpointObject checkpointObject = readCheckpoint();
@@ -177,215 +188,224 @@
         //  [ analysis phase ]
         //  - collect all committed Lsn 
         //-------------------------------------------------------------------------
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] in analysis phase");
-        }
+        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
 
         //#. set log reader to the lowWaterMarkLsn
         ILogReader logReader = logMgr.getLogReader(true);
-        logReader.initializeScan(lowWaterMarkLSN);
-        ILogRecord logRecord = logReader.next();
-        while (logRecord != null) {
-            if (IS_DEBUG_MODE) {
-                LOGGER.info(logRecord.getLogRecordForDisplay());
-            }
-            //update max jobId
-            if (logRecord.getJobId() > maxJobId) {
-                maxJobId = logRecord.getJobId();
-            }
-            switch (logRecord.getLogType()) {
-                case LogType.UPDATE:
-                    updateLogCount++;
-                    break;
-                case LogType.JOB_COMMIT:
-                    winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
-                    jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
-                    jobCommitLogCount++;
-                    break;
-                case LogType.ENTITY_COMMIT:
-                    jobId = logRecord.getJobId();
-                    winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                            logRecord.getPKValue(), logRecord.getPKValueSize(), true);
-                    if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
-                        winnerEntitySet = new HashSet<TxnId>();
-                        jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
-                    } else {
-                        winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
-                    }
-                    winnerEntitySet.add(winnerEntity);
-                    entityCommitLogCount++;
-                    break;
-                case LogType.ABORT:
-                    abortLogCount++;
-                    break;
-                case LogType.FLUSH:
-                    break;
-                default:
-                    throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-            }
+        ILogRecord logRecord = null;
+        try {
+            logReader.initializeScan(lowWaterMarkLSN);
             logRecord = logReader.next();
-        }
-
-        //-------------------------------------------------------------------------
-        //  [ redo phase ]
-        //  - redo if
-        //    1) The TxnId is committed && --> guarantee durability
-        //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
-        //-------------------------------------------------------------------------
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] in redo phase");
-        }
-        long resourceId;
-        long maxDiskLastLsn;
-        long LSN = -1;
-        ILSMIndex index = null;
-        LocalResource localResource = null;
-        ILocalResourceMetadata localResourceMetadata = null;
-        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
-        boolean foundWinner = false;
-
-        //#. get indexLifeCycleManager 
-        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-        IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
-        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-
-        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
-                .loadAndGetAllResources();
-        //#. set log reader to the lowWaterMarkLsn again.
-        logReader.initializeScan(lowWaterMarkLSN);
-        logRecord = logReader.next();
-        while (logRecord != null) {
-            if (IS_DEBUG_MODE) {
-                LOGGER.info(logRecord.getLogRecordForDisplay());
-            }
-            LSN = logRecord.getLSN();
-            jobId = logRecord.getJobId();
-            foundWinner = false;
-            switch (logRecord.getLogType()) {
-                case LogType.UPDATE:
-                    if (winnerJobSet.contains(Integer.valueOf(jobId))) {
-                        foundWinner = true;
-                    } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
-                        winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
-                        tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                logRecord.getPKValue(), logRecord.getPKValueSize());
-                        if (winnerEntitySet.contains(tempKeyTxnId)) {
-                            foundWinner = true;
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //update max jobId
+                if (logRecord.getJobId() > maxJobId) {
+                    maxJobId = logRecord.getJobId();
+                }
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        updateLogCount++;
+                        break;
+                    case LogType.JOB_COMMIT:
+                        jobId = logRecord.getJobId();
+                        winnerJobSet.add(jobId);
+                        if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                            //to delete any spilled files as well
+                            jobEntityWinners.clear();
+                            jobId2WinnerEntitiesMap.remove(jobId);
                         }
-                    }
-                    if (foundWinner) {
-                        resourceId = logRecord.getResourceId();
-                        localResource = resourcesMap.get(resourceId);
-
-                        /*******************************************************************
-                         * [Notice]
-                         * -> Issue
-                         * Delete index may cause a problem during redo.
-                         * The index operation to be redone couldn't be redone because the corresponding index
-                         * may not exist in NC due to the possible index drop DDL operation.
-                         * -> Approach
-                         * Avoid the problem during redo.
-                         * More specifically, the problem will be detected when the localResource of
-                         * the corresponding index is retrieved, which will end up with 'null'.
-                         * If null is returned, then just go and process the next
-                         * log record.
-                         *******************************************************************/
-                        if (localResource == null) {
-                            logRecord = logReader.next();
-                            continue;
-                        }
-                        /*******************************************************************/
-
-                        //get index instance from IndexLifeCycleManager
-                        //if index is not registered into IndexLifeCycleManager,
-                        //create the index using LocalMetadata stored in LocalResourceRepository
-                        index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
-                        if (index == null) {
-                            //#. create index instance and register to indexLifeCycleManager
-                            localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
-                            index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                    localResource.getResourceName(), localResource.getPartition());
-                            indexLifecycleManager.register(localResource.getResourceName(), index);
-                            indexLifecycleManager.open(localResource.getResourceName());
-
-                            //#. get maxDiskLastLSN
-                            ILSMIndex lsmIndex = (ILSMIndex) index;
-                            maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                    .getComponentLSN(lsmIndex.getImmutableComponents());
-
-                            //#. set resourceId and maxDiskLastLSN to the map
-                            resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
+                        jobCommitLogCount++;
+                        break;
+                    case LogType.ENTITY_COMMIT:
+                        jobId = logRecord.getJobId();
+                        if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                            jobEntityWinners = new JobEntityCommits(jobId);
+                            if (needToFreeMemory()) {
+                                //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+                                //This could happen only when we have many jobs with small number of records and none of them have job commit.
+                                freeJobsCachedEntities(jobId);
+                            }
+                            jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
                         } else {
-                            maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
+                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
                         }
-
-                        if (LSN > maxDiskLastLsn) {
-                            redo(logRecord);
-                            redoCount++;
-                        }
-                    }
-                    break;
-
-                case LogType.JOB_COMMIT:
-                case LogType.ENTITY_COMMIT:
-                case LogType.ABORT:
-                case LogType.FLUSH:
-
-                    //do nothing
-                    break;
-
-                default:
-                    throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                        jobEntityWinners.add(logRecord);
+                        entityCommitLogCount++;
+                        break;
+                    case LogType.ABORT:
+                        abortLogCount++;
+                        break;
+                    case LogType.FLUSH:
+                        break;
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                }
+                logRecord = logReader.next();
             }
+
+            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+                winners.prepareForSearch();
+            }
+            //-------------------------------------------------------------------------
+            //  [ redo phase ]
+            //  - redo if
+            //    1) The TxnId is committed && --> guarantee durability
+            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+            //-------------------------------------------------------------------------
+            LOGGER.info("[RecoveryMgr] in redo phase");
+
+            long resourceId;
+            long maxDiskLastLsn;
+            long LSN = -1;
+            ILSMIndex index = null;
+            LocalResource localResource = null;
+            ILocalResourceMetadata localResourceMetadata = null;
+            Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+            boolean foundWinner = false;
+
+            IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+            IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+            ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+            Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
+                    .loadAndGetAllResources();
+
+            //set log reader to the lowWaterMarkLsn again.
+            logReader.initializeScan(lowWaterMarkLSN);
             logRecord = logReader.next();
-        }
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                LSN = logRecord.getLSN();
+                jobId = logRecord.getJobId();
+                foundWinner = false;
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        if (winnerJobSet.contains(jobId)) {
+                            foundWinner = true;
+                        } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                            jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                    logRecord.getPKValue(), logRecord.getPKValueSize());
+                            if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+                                foundWinner = true;
+                            }
+                        }
+                        if (foundWinner) {
+                            resourceId = logRecord.getResourceId();
+                            localResource = resourcesMap.get(resourceId);
+                            /*******************************************************************
+                             * [Notice]
+                             * -> Issue
+                             * Delete index may cause a problem during redo.
+                             * The index operation to be redone couldn't be redone because the corresponding index
+                             * may not exist in NC due to the possible index drop DDL operation.
+                             * -> Approach
+                             * Avoid the problem during redo.
+                             * More specifically, the problem will be detected when the localResource of
+                             * the corresponding index is retrieved, which will end up with 'null'.
+                             * If null is returned, then just go and process the next
+                             * log record.
+                             *******************************************************************/
+                            if (localResource == null) {
+                                logRecord = logReader.next();
+                                continue;
+                            }
+                            /*******************************************************************/
 
-        //close all indexes
-        Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
-        for (long r : resourceIdList) {
-            indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
-        }
+                            //get index instance from IndexLifeCycleManager
+                            //if index is not registered into IndexLifeCycleManager,
+                            //create the index using LocalMetadata stored in LocalResourceRepository
+                            index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
+                            if (index == null) {
+                                //#. create index instance and register to indexLifeCycleManager
+                                localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                        localResource.getResourceName(), localResource.getPartition());
+                                indexLifecycleManager.register(localResource.getResourceName(), index);
+                                indexLifecycleManager.open(localResource.getResourceName());
 
-        logReader.close();
+                                //#. get maxDiskLastLSN
+                                ILSMIndex lsmIndex = (ILSMIndex) index;
+                                maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                                        .getComponentLSN(lsmIndex.getImmutableComponents());
 
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("[RecoveryMgr] recovery is completed.");
-            LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
-                    + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/"
-                    + redoCount);
+                                //#. set resourceId and maxDiskLastLSN to the map
+                                resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+                            } else {
+                                maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                            }
+
+                            if (LSN > maxDiskLastLsn) {
+                                redo(logRecord);
+                                redoCount++;
+                            }
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                    case LogType.ENTITY_COMMIT:
+                    case LogType.ABORT:
+                    case LogType.FLUSH:
+                        //do nothing
+                        break;
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                }
+                logRecord = logReader.next();
+            }
+
+            //close all indexes
+            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+            for (long r : resourceIdList) {
+                indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("[RecoveryMgr] recovery is completed.");
+                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+                        + "/" + redoCount);
+            }
+        } finally {
+            logReader.close();
+            //delete any recovery files after completing recovery
+            deleteRecoveryTemporaryFiles();
         }
     }
 
+    private static boolean needToFreeMemory() {
+        return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
+    }
+
     @Override
     public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
             throws ACIDException, HyracksDataException {
-
         long minMCTFirstLSN;
         boolean nonSharpCheckpointSucceeded = false;
 
-        if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting sharp checkpoint ... ");
+        if (isSharpCheckpoint) {
+            LOGGER.log(Level.INFO, "Starting sharp checkpoint ... ");
         }
 
         TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
         String logDir = logMgr.getLogManagerProperties().getLogDir();
 
-        //#. get the filename of the previous checkpoint files which are about to be deleted 
-        //   right after the new checkpoint file is written.
+        //get the filename of the previous checkpoint files which are about to be deleted 
+        //right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
         DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
                 .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
-        //#. flush all in-memory components if it is the sharp checkpoint
+        //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
-
             datasetLifecycleManager.flushAllDatasets();
-
             minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
         } else {
-
             minMCTFirstLSN = getMinFirstLSN();
-
             if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
                 nonSharpCheckpointSucceeded = true;
             } else {
@@ -400,7 +420,7 @@
         FileOutputStream fos = null;
         ObjectOutputStream oosToFos = null;
         try {
-            String fileName = getFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
+            String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
             fos = new FileOutputStream(fileName);
             oosToFos = new ObjectOutputStream(fos);
             oosToFos.writeObject(checkpointObject);
@@ -460,11 +480,8 @@
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
-
         if (openIndexList.size() > 0) {
-
             for (IIndex index : openIndexList) {
-
                 AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
                         .getIOOperationCallback();
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
@@ -473,23 +490,19 @@
                 }
             }
         }
-
         return minFirstLSN;
     }
 
     private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
-
         CheckpointObject checkpointObject = null;
 
-        //#. read all checkpointObjects from the existing checkpoint files
+        //read all checkpointObjects from the existing checkpoint files
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-
         if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
             throw new FileNotFoundException("Checkpoint file is not found");
         }
 
         List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>();
-
         for (File file : prevCheckpointFiles) {
             FileInputStream fis = null;
             ObjectInputStream oisFromFis = null;
@@ -528,7 +541,6 @@
 
     private File[] getPreviousCheckpointFiles() {
         String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir();
-
         File parentDir = new File(logDir);
 
         FilenameFilter filter = new FilenameFilter() {
@@ -542,20 +554,61 @@
             }
         };
 
-        File[] prevCheckpointFiles = parentDir.listFiles(filter);
-
-        return prevCheckpointFiles;
+        return parentDir.listFiles(filter);
     }
 
-    private String getFileName(String baseDir, String suffix) {
-
+    private String getCheckpointFileName(String baseDir, String suffix) {
         if (!baseDir.endsWith(System.getProperty("file.separator"))) {
             baseDir += System.getProperty("file.separator");
         }
-
         return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
     }
 
+    private File createJobRecoveryFile(int jobId, String fileName) throws IOException {
+        String recoveryDirPath = getRecoveryDirPath();
+        Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId);
+        if (!Files.exists(JobRecoveryFolder)) {
+            Files.createDirectories(JobRecoveryFolder);
+        }
+
+        File jobRecoveryFile = new File(JobRecoveryFolder.toString() + File.separator + fileName);
+        if (!jobRecoveryFile.exists()) {
+            jobRecoveryFile.createNewFile();
+        } else {
+            throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists");
+        }
+
+        return jobRecoveryFile;
+    }
+
+    private void deleteRecoveryTemporaryFiles() throws IOException {
+        String recoveryDirPath = getRecoveryDirPath();
+        Path recoveryFolderPath = Paths.get(recoveryDirPath);
+        if (Files.exists(recoveryFolderPath)) {
+            FileUtils.deleteDirectory(recoveryFolderPath.toFile());
+        }
+    }
+
+    private String getRecoveryDirPath() {
+        String logDir = logMgr.getLogManagerProperties().getLogDir();
+        if (!logDir.endsWith(File.separator)) {
+            logDir += File.separator;
+        }
+
+        return logDir + RECOVERY_FILES_DIR_NAME;
+    }
+
+    private void freeJobsCachedEntities(int requestingJobId) throws IOException {
+        if (jobId2WinnerEntitiesMap != null) {
+            for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) {
+                //if the job is not the requester, free its memory
+                if (jobEntityCommits.getKey() != requestingJobId) {
+                    jobEntityCommits.getValue().spillToDiskAndfreeMemory();
+                }
+            }
+        }
+    }
+
     /**
      * Rollback a transaction
      * 
@@ -563,136 +616,121 @@
      */
     @Override
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
-        Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-
-        int updateLogCount = 0;
-        int entityCommitLogCount = 0;
-        int jobId = -1;
         int abortedJobId = txnContext.getJobId().getId();
-        long currentLSN = -1;
-        TxnId loserEntity = null;
-
         // Obtain the first/last log record LSNs written by the Job
         long firstLSN = txnContext.getFirstLSN();
         long lastLSN = txnContext.getLastLSN();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
-        }
 
+        LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
         // check if the transaction actually wrote some logs.
         if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(" no need to roll back as there were no operations by the transaction "
-                        + txnContext.getJobId());
-            }
+            LOGGER.log(Level.INFO,
+                    "no need to roll back as there were no operations by the transaction " + txnContext.getJobId());
             return;
         }
 
         // While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
-        }
+        LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
+
+        Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, List<Long>>();
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        int updateLogCount = 0;
+        int entityCommitLogCount = 0;
+        int logJobId = -1;
+        long currentLSN = -1;
+        TxnId loserEntity = null;
         List<Long> undoLSNSet = null;
+
         ILogReader logReader = logMgr.getLogReader(false);
-        logReader.initializeScan(firstLSN);
-        ILogRecord logRecord = null;
-
-        while (currentLSN < lastLSN) {
-            logRecord = logReader.next();
-            if (logRecord == null) {
-                break;
-            } else {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                currentLSN = logRecord.getLSN();
-            }
-            jobId = logRecord.getJobId();
-            if (jobId != abortedJobId) {
-                continue;
-            }
-            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(),
-                    logRecord.getPKValueSize());
-            switch (logRecord.getLogType()) {
-                case LogType.UPDATE:
-                    undoLSNSet = loserTxnTable.get(tempKeyTxnId);
-                    if (undoLSNSet == null) {
-                        loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                logRecord.getPKValue(), logRecord.getPKValueSize(), true);
-                        undoLSNSet = new LinkedList<Long>();
-                        loserTxnTable.put(loserEntity, undoLSNSet);
-                    }
-                    undoLSNSet.add(Long.valueOf(currentLSN));
-                    updateLogCount++;
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
-                                + tempKeyTxnId);
-                    }
-                    break;
-
-                case LogType.JOB_COMMIT:
-                    throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
-
-                case LogType.ENTITY_COMMIT:
-                    undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
-                    if (undoLSNSet == null) {
-                        undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
-                    }
-                    entityCommitLogCount++;
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
-                                + tempKeyTxnId);
-                    }
-                    break;
-
-                case LogType.ABORT:
-                case LogType.FLUSH:
-                    //ignore
-                    break;
-
-                default:
-                    throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-            }
-        }
-        if (currentLSN != lastLSN) {
-            throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
-                    + ") during abort( " + txnContext.getJobId() + ")");
-        }
-
-        //undo loserTxn's effect
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" undoing loser transaction's effect");
-        }
-
-        Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
-        int undoCount = 0;
-        while (iter.hasNext()) {
-            //TODO 
-            //Sort the lsns in order to undo in one pass. 
-
-            Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
-            undoLSNSet = loserTxn.getValue();
-
-            for (long undoLSN : undoLSNSet) {
-                //here, all the log records are UPDATE type. So, we don't need to check the type again.
-                //read the corresponding log record to be undone.
-                logRecord = logReader.read(undoLSN);
+        try {
+            logReader.initializeScan(firstLSN);
+            ILogRecord logRecord = null;
+            while (currentLSN < lastLSN) {
+                logRecord = logReader.next();
                 if (logRecord == null) {
-                    throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+                    break;
+                } else {
+                    currentLSN = logRecord.getLSN();
+                    if (IS_DEBUG_MODE) {
+                        LOGGER.info(logRecord.getLogRecordForDisplay());
+                    }
                 }
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                logJobId = logRecord.getJobId();
+                if (logJobId != abortedJobId) {
+                    continue;
                 }
-                undo(logRecord);
-                undoCount++;
+                tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                switch (logRecord.getLogType()) {
+                    case LogType.UPDATE:
+                        undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
+                        if (undoLSNSet == null) {
+                            loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                    logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+                            undoLSNSet = new LinkedList<Long>();
+                            jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet);
+                        }
+                        undoLSNSet.add(currentLSN);
+                        updateLogCount++;
+                        if (IS_DEBUG_MODE) {
+                            LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+                                    + tempKeyTxnId);
+                        }
+                        break;
+                    case LogType.ENTITY_COMMIT:
+                        jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
+                        entityCommitLogCount++;
+                        if (IS_DEBUG_MODE) {
+                            LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                                    + tempKeyTxnId);
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                        throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+                    case LogType.ABORT:
+                    case LogType.FLUSH:
+                        //ignore
+                        break;
+                    default:
+                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                }
             }
-        }
-        logReader.close();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" undone loser transaction's effect");
-            LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
-                    + entityCommitLogCount + "/" + undoCount);
+            if (currentLSN != lastLSN) {
+                throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
+                        + ") during abort( " + txnContext.getJobId() + ")");
+            }
+
+            //undo loserTxn's effect
+            LOGGER.log(Level.INFO, "undoing loser transaction's effect");
+
+            //TODO sort loser entities by smallest LSN to undo in one pass. 
+            Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
+            int undoCount = 0;
+            while (iter.hasNext()) {
+                Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next();
+                undoLSNSet = loserEntity2LSNsMap.getValue();
+                for (long undoLSN : undoLSNSet) {
+                    //here, all the log records are UPDATE type. So, we don't need to check the type again.
+                    //read the corresponding log record to be undone.
+                    logRecord = logReader.read(undoLSN);
+                    if (logRecord == null) {
+                        throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+                    }
+                    if (IS_DEBUG_MODE) {
+                        LOGGER.info(logRecord.getLogRecordForDisplay());
+                    }
+                    undo(logRecord);
+                    undoCount++;
+                }
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("undone loser transaction's effect");
+                LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
+                        + entityCommitLogCount + "/" + undoCount);
+            }
+        } finally {
+            logReader.close();
         }
     }
 
@@ -746,6 +784,175 @@
             throw new IllegalStateException("Failed to redo", e);
         }
     }
+
+    private class JobEntityCommits {
+        private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
+        private final int jobId;
+        private final Set<TxnId> cachedEntityCommitTxns = new HashSet<TxnId>();
+        private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<File>();
+        //a flag indicating whether all the the commits for this jobs have been added.
+        private boolean preparedForSearch = false;
+        private TxnId winnerEntity = null;
+        private int currentPartitionSize = 0;
+        private long partitionMaxLSN = 0;
+        private String currentPartitonName;
+
+        public JobEntityCommits(int jobId) {
+            this.jobId = jobId;
+        }
+
+        public void add(ILogRecord logRecord) throws IOException {
+            if (preparedForSearch) {
+                throw new IOException("Cannot add new entity commits after preparing for search.");
+            }
+            winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                    logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+            cachedEntityCommitTxns.add(winnerEntity);
+            //since log file is read sequentially, LSNs are always increasing
+            partitionMaxLSN = logRecord.getLSN();
+            currentPartitionSize += winnerEntity.getCurrentSize();
+            //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory
+            if (currentPartitionSize >= MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE) {
+                spillToDiskAndfreeMemory();
+            }
+        }
+
+        public void spillToDiskAndfreeMemory() throws IOException {
+            if (cachedEntityCommitTxns.size() > 0) {
+                if (!preparedForSearch) {
+                    writeCurrentPartitionToDisk();
+                }
+                cachedEntityCommitTxns.clear();
+                partitionMaxLSN = 0;
+                currentPartitionSize = 0;
+                currentPartitonName = "";
+            }
+        }
+
+        /**
+         * Call this method when no more entity commits will be added to this job.
+         * 
+         * @throws IOException
+         */
+        public void prepareForSearch() throws IOException {
+            //if we have anything left in memory, we need to spill them to disk before searching other partitions.
+            //However, if we don't have anything on disk, we will search from memory only
+            if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) {
+                spillToDiskAndfreeMemory();
+            } else {
+                //set the name of the current in memory partition to the current partition
+                currentPartitonName = getPartitionName(partitionMaxLSN);
+            }
+            preparedForSearch = true;
+        }
+
+        public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException {
+            //if we don't have any partitions on disk, search only from memory
+            if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) {
+                return cachedEntityCommitTxns.contains(txnId);
+            } else {
+                //get candidate partitions from disk
+                ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN);
+                for (File partition : candidatePartitions) {
+                    if (serachPartition(partition, txnId)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+
+        /**
+         * @param logLSN
+         * @return partitions that have a max LSN > logLSN
+         */
+        public ArrayList<File> getCandidiatePartitions(long logLSN) {
+            ArrayList<File> candidiatePartitions = new ArrayList<File>();
+
+            for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
+                String partitionName = partition.getName();
+                //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN 
+                if (getPartitionMaxLSNFromName(partitionName) > logLSN) {
+                    candidiatePartitions.add(partition);
+                }
+            }
+
+            return candidiatePartitions;
+        }
+
+        public void clear() {
+            cachedEntityCommitTxns.clear();
+            for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
+                partition.delete();
+            }
+            jobEntitCommitOnDiskPartitionsFiles.clear();
+        }
+
+        private boolean serachPartition(File partition, TxnId txnId) throws IOException {
+            //load partition from disk if it is not  already in memory
+            if (!partition.getName().equals(currentPartitonName)) {
+                loadPartitionToMemory(partition, cachedEntityCommitTxns);
+                currentPartitonName = partition.getName();
+            }
+            return cachedEntityCommitTxns.contains(txnId);
+        }
+
+        private String getPartitionName(long maxLSN) {
+            return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN;
+        }
+
+        private long getPartitionMaxLSNFromName(String partitionName) {
+            return Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR) + 1));
+        }
+
+        private void writeCurrentPartitionToDisk() throws IOException {
+            //if we don't have enough memory to allocate for this partition, we will ask recovery manager to free memory
+            if (needToFreeMemory()) {
+                freeJobsCachedEntities(jobId);
+            }
+            //allocate a buffer that can hold the current partition
+            ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize);
+            for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) {
+                TxnId txnId = iterator.next();
+                //serialize the object and remove it from memory
+                txnId.serialize(buffer);
+                iterator.remove();
+            }
+            //name partition file based on job id and max lsn
+            File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN));
+            //write file to disk
+            try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false);
+                    FileChannel fileChannel = fileOutputstream.getChannel()) {
+                buffer.flip();
+                while (buffer.hasRemaining()) {
+                    fileChannel.write(buffer);
+                }
+            }
+            jobEntitCommitOnDiskPartitionsFiles.add(partitionFile);
+        }
+
+        private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException {
+            partitionTxn.clear();
+            //if we don't have enough memory to a load partition, we will ask recovery manager to free memory
+            if (needToFreeMemory()) {
+                freeJobsCachedEntities(jobId);
+            }
+            ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length());
+            //load partition to memory
+            try (InputStream is = new FileInputStream(partition)) {
+                int readByte;
+                while ((readByte = is.read()) != -1) {
+                    buffer.put((byte) readByte);
+                }
+            }
+            buffer.flip();
+            TxnId temp = null;
+            while (buffer.remaining() != 0) {
+                temp = TxnId.deserialize(buffer);
+                partitionTxn.add(temp);
+            }
+        }
+    }
 }
 
 class TxnId {
@@ -772,6 +979,9 @@
         }
     }
 
+    public TxnId() {
+    }
+
     private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
         int readOffset = pkValue.getFieldStart(0);
         byte[] readBuffer = pkValue.getFieldData(0);
@@ -786,7 +996,7 @@
         this.pkHashValue = pkHashValue;
         this.tupleReferencePKValue = pkValue;
         this.pkSize = pkSize;
-        isByteArrayPKValue = false;
+        this.isByteArrayPKValue = false;
     }
 
     @Override
@@ -856,4 +1066,40 @@
         }
         return true;
     }
-}
\ No newline at end of file
+
+    public void serialize(ByteBuffer buffer) throws IOException {
+        buffer.putInt(jobId);
+        buffer.putInt(datasetId);
+        buffer.putInt(pkHashValue);
+        buffer.putInt(pkSize);
+        buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
+        if (isByteArrayPKValue) {
+            buffer.put(byteArrayPKValue);
+        }
+    }
+
+    public static TxnId deserialize(ByteBuffer buffer) {
+        TxnId txnId = new TxnId();
+        txnId.jobId = buffer.getInt();
+        txnId.datasetId = buffer.getInt();
+        txnId.pkHashValue = buffer.getInt();
+        txnId.pkSize = buffer.getInt();
+        txnId.isByteArrayPKValue = (buffer.get() == 1);
+        if (txnId.isByteArrayPKValue) {
+            byte[] byteArrayPKValue = new byte[txnId.pkSize];
+            buffer.get(byteArrayPKValue);
+            txnId.byteArrayPKValue = byteArrayPKValue;
+        }
+        return txnId;
+    }
+
+    public int getCurrentSize() {
+        //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
+        int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
+        //byte arraySize
+        if (isByteArrayPKValue && byteArrayPKValue != null) {
+            size += byteArrayPKValue.length;
+        }
+        return size;
+    }
+}