ASTERIXDB-969: Redesigned recovery analysis phase to spill to disk
Change-Id: Ide2b346c2ad498d7595e71bae890362c2143d301
Reviewed-on: https://asterix-gerrit.ics.uci.edu/458
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
index 79d45e4..4bcbada 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java
@@ -25,6 +25,11 @@
*
*/
private static final long serialVersionUID = 1L;
+ /**
+ * The number of bytes used to represent {@link DatasetId} value.
+ */
+ public static final int BYTES = Integer.BYTES;
+
int id;
public DatasetId(int id) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
index 3c80e67..046487a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java
@@ -22,6 +22,10 @@
public class JobId implements Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * The number of bytes used to represent {@link JobId} value.
+ */
+ public static final int BYTES = Integer.BYTES;
private int id;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 60e3097..a510e51 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -101,21 +101,19 @@
checksumGen = new CRC32();
}
- private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE;
- private final static int JID_LEN = Integer.SIZE / Byte.SIZE;
- private final static int DSID_LEN = Integer.SIZE / Byte.SIZE;
- private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
- private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int TYPE_LEN = Byte.SIZE / Byte.SIZE;
+ public final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
+ public final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
- private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE;
+ private final static int NEWOP_LEN = Byte.SIZE / Byte.SIZE;
private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
- private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN;
- private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN;
+ private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JobId.BYTES;
+ private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
@@ -142,11 +140,11 @@
buffer.putInt(newValueSize);
writeTuple(buffer, newValue, newValueSize);
}
-
+
if (logType == LogType.FLUSH) {
buffer.putInt(datasetId);
}
-
+
checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
buffer.putLong(checksum);
}
@@ -174,20 +172,19 @@
public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
//first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
- if(buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+ if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
logType = buffer.get();
jobId = buffer.getInt();
- if(logType != LogType.FLUSH)
- {
+ if (logType != LogType.FLUSH) {
if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
PKHashValue = -1;
} else {
//attempt to read in the dsid, PK hash and PK length
- if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){
+ if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -195,7 +192,7 @@
PKHashValue = buffer.getInt();
PKValueSize = buffer.getInt();
//attempt to read in the PK
- if(buffer.remaining() < PKValueSize){
+ if (buffer.remaining() < PKValueSize) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -206,7 +203,7 @@
}
if (logType == LogType.UPDATE) {
//attempt to read in the previous LSN, log size, new value size, and new record type
- if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){
+ if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -216,7 +213,7 @@
fieldCnt = buffer.getInt();
newOp = buffer.get();
newValueSize = buffer.getInt();
- if(buffer.remaining() < newValueSize){
+ if (buffer.remaining() < newValueSize) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -224,10 +221,9 @@
} else {
computeAndSetLogSize();
}
- }
- else{
+ } else {
computeAndSetLogSize();
- if(buffer.remaining() < DSID_LEN){
+ if (buffer.remaining() < DatasetId.BYTES) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -235,7 +231,7 @@
resourceId = 0l;
}
//atempt to read checksum
- if(buffer.remaining() < CHKSUM_LEN){
+ if (buffer.remaining() < CHKSUM_LEN) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
@@ -274,7 +270,7 @@
this.PKHashValue = -1;
computeAndSetLogSize();
}
-
+
public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
this.logType = LogType.FLUSH;
this.jobId = -1;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 82ad32d..fec04e4 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -24,9 +24,15 @@
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -45,17 +51,21 @@
import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -83,6 +93,11 @@
private final LogManager logMgr;
private final int checkpointHistory;
private final long SHARP_CHECKPOINT_LSN = -1;
+ private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+ private static final long MEGABYTE = 1024L * 1024L;
+ private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
+ private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB;
+
/**
* A file at a known location that contains the LSN of the last log record
* traversed doing a successful checkpoint.
@@ -105,8 +120,7 @@
* not supported, yet.
*/
public SystemState getSystemState() throws ACIDException {
-
- //#. read checkpoint file
+ //read checkpoint file
CheckpointObject checkpointObject = null;
try {
checkpointObject = readCheckpoint();
@@ -143,6 +157,8 @@
}
public void startRecovery(boolean synchronous) throws IOException, ACIDException {
+ //delete any recovery files from previous failed recovery attempts
+ deleteRecoveryTemporaryFiles();
int updateLogCount = 0;
int entityCommitLogCount = 0;
@@ -152,18 +168,13 @@
int jobId = -1;
state = SystemState.RECOVERING;
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("[RecoveryMgr] starting recovery ...");
- }
+ LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
Set<Integer> winnerJobSet = new HashSet<Integer>();
- Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
- //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
- Set<TxnId> winnerEntitySet = null;
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
- TxnId winnerEntity = null;
+ jobId2WinnerEntitiesMap = new HashMap<>();
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+ JobEntityCommits jobEntityWinners = null;
//#. read checkpoint file and set lowWaterMark where anaylsis and redo start
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
CheckpointObject checkpointObject = readCheckpoint();
@@ -177,215 +188,224 @@
// [ analysis phase ]
// - collect all committed Lsn
//-------------------------------------------------------------------------
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("[RecoveryMgr] in analysis phase");
- }
+ LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
//#. set log reader to the lowWaterMarkLsn
ILogReader logReader = logMgr.getLogReader(true);
- logReader.initializeScan(lowWaterMarkLSN);
- ILogRecord logRecord = logReader.next();
- while (logRecord != null) {
- if (IS_DEBUG_MODE) {
- LOGGER.info(logRecord.getLogRecordForDisplay());
- }
- //update max jobId
- if (logRecord.getJobId() > maxJobId) {
- maxJobId = logRecord.getJobId();
- }
- switch (logRecord.getLogType()) {
- case LogType.UPDATE:
- updateLogCount++;
- break;
- case LogType.JOB_COMMIT:
- winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
- jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
- jobCommitLogCount++;
- break;
- case LogType.ENTITY_COMMIT:
- jobId = logRecord.getJobId();
- winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
- logRecord.getPKValue(), logRecord.getPKValueSize(), true);
- if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
- winnerEntitySet = new HashSet<TxnId>();
- jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
- } else {
- winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
- }
- winnerEntitySet.add(winnerEntity);
- entityCommitLogCount++;
- break;
- case LogType.ABORT:
- abortLogCount++;
- break;
- case LogType.FLUSH:
- break;
- default:
- throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
- }
+ ILogRecord logRecord = null;
+ try {
+ logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
- }
-
- //-------------------------------------------------------------------------
- // [ redo phase ]
- // - redo if
- // 1) The TxnId is committed && --> guarantee durability
- // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
- //-------------------------------------------------------------------------
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("[RecoveryMgr] in redo phase");
- }
- long resourceId;
- long maxDiskLastLsn;
- long LSN = -1;
- ILSMIndex index = null;
- LocalResource localResource = null;
- ILocalResourceMetadata localResourceMetadata = null;
- Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
- boolean foundWinner = false;
-
- //#. get indexLifeCycleManager
- IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
- IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
- ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-
- Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
- .loadAndGetAllResources();
- //#. set log reader to the lowWaterMarkLsn again.
- logReader.initializeScan(lowWaterMarkLSN);
- logRecord = logReader.next();
- while (logRecord != null) {
- if (IS_DEBUG_MODE) {
- LOGGER.info(logRecord.getLogRecordForDisplay());
- }
- LSN = logRecord.getLSN();
- jobId = logRecord.getJobId();
- foundWinner = false;
- switch (logRecord.getLogType()) {
- case LogType.UPDATE:
- if (winnerJobSet.contains(Integer.valueOf(jobId))) {
- foundWinner = true;
- } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
- winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
- tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
- logRecord.getPKValue(), logRecord.getPKValueSize());
- if (winnerEntitySet.contains(tempKeyTxnId)) {
- foundWinner = true;
+ while (logRecord != null) {
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
+ //update max jobId
+ if (logRecord.getJobId() > maxJobId) {
+ maxJobId = logRecord.getJobId();
+ }
+ switch (logRecord.getLogType()) {
+ case LogType.UPDATE:
+ updateLogCount++;
+ break;
+ case LogType.JOB_COMMIT:
+ jobId = logRecord.getJobId();
+ winnerJobSet.add(jobId);
+ if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+ //to delete any spilled files as well
+ jobEntityWinners.clear();
+ jobId2WinnerEntitiesMap.remove(jobId);
}
- }
- if (foundWinner) {
- resourceId = logRecord.getResourceId();
- localResource = resourcesMap.get(resourceId);
-
- /*******************************************************************
- * [Notice]
- * -> Issue
- * Delete index may cause a problem during redo.
- * The index operation to be redone couldn't be redone because the corresponding index
- * may not exist in NC due to the possible index drop DDL operation.
- * -> Approach
- * Avoid the problem during redo.
- * More specifically, the problem will be detected when the localResource of
- * the corresponding index is retrieved, which will end up with 'null'.
- * If null is returned, then just go and process the next
- * log record.
- *******************************************************************/
- if (localResource == null) {
- logRecord = logReader.next();
- continue;
- }
- /*******************************************************************/
-
- //get index instance from IndexLifeCycleManager
- //if index is not registered into IndexLifeCycleManager,
- //create the index using LocalMetadata stored in LocalResourceRepository
- index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
- if (index == null) {
- //#. create index instance and register to indexLifeCycleManager
- localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
- index = localResourceMetadata.createIndexInstance(appRuntimeContext,
- localResource.getResourceName(), localResource.getPartition());
- indexLifecycleManager.register(localResource.getResourceName(), index);
- indexLifecycleManager.open(localResource.getResourceName());
-
- //#. get maxDiskLastLSN
- ILSMIndex lsmIndex = (ILSMIndex) index;
- maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
- .getComponentLSN(lsmIndex.getImmutableComponents());
-
- //#. set resourceId and maxDiskLastLSN to the map
- resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
+ jobCommitLogCount++;
+ break;
+ case LogType.ENTITY_COMMIT:
+ jobId = logRecord.getJobId();
+ if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = new JobEntityCommits(jobId);
+ if (needToFreeMemory()) {
+ //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+ //This could happen only when we have many jobs with small number of records and none of them have job commit.
+ freeJobsCachedEntities(jobId);
+ }
+ jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
} else {
- maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
}
-
- if (LSN > maxDiskLastLsn) {
- redo(logRecord);
- redoCount++;
- }
- }
- break;
-
- case LogType.JOB_COMMIT:
- case LogType.ENTITY_COMMIT:
- case LogType.ABORT:
- case LogType.FLUSH:
-
- //do nothing
- break;
-
- default:
- throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ jobEntityWinners.add(logRecord);
+ entityCommitLogCount++;
+ break;
+ case LogType.ABORT:
+ abortLogCount++;
+ break;
+ case LogType.FLUSH:
+ break;
+ default:
+ throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ }
+ logRecord = logReader.next();
}
+
+ //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+ for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+ winners.prepareForSearch();
+ }
+ //-------------------------------------------------------------------------
+ // [ redo phase ]
+ // - redo if
+ // 1) The TxnId is committed && --> guarantee durability
+ // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+ //-------------------------------------------------------------------------
+ LOGGER.info("[RecoveryMgr] in redo phase");
+
+ long resourceId;
+ long maxDiskLastLsn;
+ long LSN = -1;
+ ILSMIndex index = null;
+ LocalResource localResource = null;
+ ILocalResourceMetadata localResourceMetadata = null;
+ Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+ boolean foundWinner = false;
+
+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+ IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+ ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+ Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
+ .loadAndGetAllResources();
+
+ //set log reader to the lowWaterMarkLsn again.
+ logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
- }
+ while (logRecord != null) {
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
+ LSN = logRecord.getLSN();
+ jobId = logRecord.getJobId();
+ foundWinner = false;
+ switch (logRecord.getLogType()) {
+ case LogType.UPDATE:
+ if (winnerJobSet.contains(jobId)) {
+ foundWinner = true;
+ } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize());
+ if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+ foundWinner = true;
+ }
+ }
+ if (foundWinner) {
+ resourceId = logRecord.getResourceId();
+ localResource = resourcesMap.get(resourceId);
+ /*******************************************************************
+ * [Notice]
+ * -> Issue
+ * Delete index may cause a problem during redo.
+ * The index operation to be redone couldn't be redone because the corresponding index
+ * may not exist in NC due to the possible index drop DDL operation.
+ * -> Approach
+ * Avoid the problem during redo.
+ * More specifically, the problem will be detected when the localResource of
+ * the corresponding index is retrieved, which will end up with 'null'.
+ * If null is returned, then just go and process the next
+ * log record.
+ *******************************************************************/
+ if (localResource == null) {
+ logRecord = logReader.next();
+ continue;
+ }
+ /*******************************************************************/
- //close all indexes
- Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
- for (long r : resourceIdList) {
- indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
- }
+ //get index instance from IndexLifeCycleManager
+ //if index is not registered into IndexLifeCycleManager,
+ //create the index using LocalMetadata stored in LocalResourceRepository
+ index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
+ if (index == null) {
+ //#. create index instance and register to indexLifeCycleManager
+ localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+ localResource.getResourceName(), localResource.getPartition());
+ indexLifecycleManager.register(localResource.getResourceName(), index);
+ indexLifecycleManager.open(localResource.getResourceName());
- logReader.close();
+ //#. get maxDiskLastLSN
+ ILSMIndex lsmIndex = (ILSMIndex) index;
+ maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ .getComponentLSN(lsmIndex.getImmutableComponents());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("[RecoveryMgr] recovery is completed.");
- LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
- + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/"
- + redoCount);
+ //#. set resourceId and maxDiskLastLSN to the map
+ resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
+
+ if (LSN > maxDiskLastLsn) {
+ redo(logRecord);
+ redoCount++;
+ }
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ case LogType.ENTITY_COMMIT:
+ case LogType.ABORT:
+ case LogType.FLUSH:
+ //do nothing
+ break;
+ default:
+ throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ }
+ logRecord = logReader.next();
+ }
+
+ //close all indexes
+ Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+ for (long r : resourceIdList) {
+ indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("[RecoveryMgr] recovery is completed.");
+ LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+ + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+ + "/" + redoCount);
+ }
+ } finally {
+ logReader.close();
+ //delete any recovery files after completing recovery
+ deleteRecoveryTemporaryFiles();
}
}
+ private static boolean needToFreeMemory() {
+ return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
+ }
+
@Override
public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
throws ACIDException, HyracksDataException {
-
long minMCTFirstLSN;
boolean nonSharpCheckpointSucceeded = false;
- if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting sharp checkpoint ... ");
+ if (isSharpCheckpoint) {
+ LOGGER.log(Level.INFO, "Starting sharp checkpoint ... ");
}
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
String logDir = logMgr.getLogManagerProperties().getLogDir();
- //#. get the filename of the previous checkpoint files which are about to be deleted
- // right after the new checkpoint file is written.
+ //get the filename of the previous checkpoint files which are about to be deleted
+ //right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
- //#. flush all in-memory components if it is the sharp checkpoint
+ //flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
-
datasetLifecycleManager.flushAllDatasets();
-
minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
} else {
-
minMCTFirstLSN = getMinFirstLSN();
-
if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
nonSharpCheckpointSucceeded = true;
} else {
@@ -400,7 +420,7 @@
FileOutputStream fos = null;
ObjectOutputStream oosToFos = null;
try {
- String fileName = getFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
+ String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
fos = new FileOutputStream(fileName);
oosToFos = new ObjectOutputStream(fos);
oosToFos.writeObject(checkpointObject);
@@ -460,11 +480,8 @@
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
-
if (openIndexList.size() > 0) {
-
for (IIndex index : openIndexList) {
-
AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
.getIOOperationCallback();
if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
@@ -473,23 +490,19 @@
}
}
}
-
return minFirstLSN;
}
private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
-
CheckpointObject checkpointObject = null;
- //#. read all checkpointObjects from the existing checkpoint files
+ //read all checkpointObjects from the existing checkpoint files
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-
if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
throw new FileNotFoundException("Checkpoint file is not found");
}
List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>();
-
for (File file : prevCheckpointFiles) {
FileInputStream fis = null;
ObjectInputStream oisFromFis = null;
@@ -528,7 +541,6 @@
private File[] getPreviousCheckpointFiles() {
String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir();
-
File parentDir = new File(logDir);
FilenameFilter filter = new FilenameFilter() {
@@ -542,20 +554,61 @@
}
};
- File[] prevCheckpointFiles = parentDir.listFiles(filter);
-
- return prevCheckpointFiles;
+ return parentDir.listFiles(filter);
}
- private String getFileName(String baseDir, String suffix) {
-
+ private String getCheckpointFileName(String baseDir, String suffix) {
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
}
-
return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
}
+ private File createJobRecoveryFile(int jobId, String fileName) throws IOException {
+ String recoveryDirPath = getRecoveryDirPath();
+ Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId);
+ if (!Files.exists(JobRecoveryFolder)) {
+ Files.createDirectories(JobRecoveryFolder);
+ }
+
+ File jobRecoveryFile = new File(JobRecoveryFolder.toString() + File.separator + fileName);
+ if (!jobRecoveryFile.exists()) {
+ jobRecoveryFile.createNewFile();
+ } else {
+ throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists");
+ }
+
+ return jobRecoveryFile;
+ }
+
+ private void deleteRecoveryTemporaryFiles() throws IOException {
+ String recoveryDirPath = getRecoveryDirPath();
+ Path recoveryFolderPath = Paths.get(recoveryDirPath);
+ if (Files.exists(recoveryFolderPath)) {
+ FileUtils.deleteDirectory(recoveryFolderPath.toFile());
+ }
+ }
+
+ private String getRecoveryDirPath() {
+ String logDir = logMgr.getLogManagerProperties().getLogDir();
+ if (!logDir.endsWith(File.separator)) {
+ logDir += File.separator;
+ }
+
+ return logDir + RECOVERY_FILES_DIR_NAME;
+ }
+
+ private void freeJobsCachedEntities(int requestingJobId) throws IOException {
+ if (jobId2WinnerEntitiesMap != null) {
+ for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) {
+ //if the job is not the requester, free its memory
+ if (jobEntityCommits.getKey() != requestingJobId) {
+ jobEntityCommits.getValue().spillToDiskAndfreeMemory();
+ }
+ }
+ }
+ }
+
/**
* Rollback a transaction
*
@@ -563,136 +616,121 @@
*/
@Override
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
- Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-
- int updateLogCount = 0;
- int entityCommitLogCount = 0;
- int jobId = -1;
int abortedJobId = txnContext.getJobId().getId();
- long currentLSN = -1;
- TxnId loserEntity = null;
-
// Obtain the first/last log record LSNs written by the Job
long firstLSN = txnContext.getFirstLSN();
long lastLSN = txnContext.getLastLSN();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
- }
+ LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
// check if the transaction actually wrote some logs.
if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" no need to roll back as there were no operations by the transaction "
- + txnContext.getJobId());
- }
+ LOGGER.log(Level.INFO,
+ "no need to roll back as there were no operations by the transaction " + txnContext.getJobId());
return;
}
// While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
- }
+ LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
+
+ Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, List<Long>>();
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+ int updateLogCount = 0;
+ int entityCommitLogCount = 0;
+ int logJobId = -1;
+ long currentLSN = -1;
+ TxnId loserEntity = null;
List<Long> undoLSNSet = null;
+
ILogReader logReader = logMgr.getLogReader(false);
- logReader.initializeScan(firstLSN);
- ILogRecord logRecord = null;
-
- while (currentLSN < lastLSN) {
- logRecord = logReader.next();
- if (logRecord == null) {
- break;
- } else {
- if (IS_DEBUG_MODE) {
- LOGGER.info(logRecord.getLogRecordForDisplay());
- }
- currentLSN = logRecord.getLSN();
- }
- jobId = logRecord.getJobId();
- if (jobId != abortedJobId) {
- continue;
- }
- tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(),
- logRecord.getPKValueSize());
- switch (logRecord.getLogType()) {
- case LogType.UPDATE:
- undoLSNSet = loserTxnTable.get(tempKeyTxnId);
- if (undoLSNSet == null) {
- loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
- logRecord.getPKValue(), logRecord.getPKValueSize(), true);
- undoLSNSet = new LinkedList<Long>();
- loserTxnTable.put(loserEntity, undoLSNSet);
- }
- undoLSNSet.add(Long.valueOf(currentLSN));
- updateLogCount++;
- if (IS_DEBUG_MODE) {
- LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
- + tempKeyTxnId);
- }
- break;
-
- case LogType.JOB_COMMIT:
- throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
-
- case LogType.ENTITY_COMMIT:
- undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
- if (undoLSNSet == null) {
- undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
- }
- entityCommitLogCount++;
- if (IS_DEBUG_MODE) {
- LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
- + tempKeyTxnId);
- }
- break;
-
- case LogType.ABORT:
- case LogType.FLUSH:
- //ignore
- break;
-
- default:
- throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
- }
- }
- if (currentLSN != lastLSN) {
- throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
- + ") during abort( " + txnContext.getJobId() + ")");
- }
-
- //undo loserTxn's effect
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" undoing loser transaction's effect");
- }
-
- Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
- int undoCount = 0;
- while (iter.hasNext()) {
- //TODO
- //Sort the lsns in order to undo in one pass.
-
- Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
- undoLSNSet = loserTxn.getValue();
-
- for (long undoLSN : undoLSNSet) {
- //here, all the log records are UPDATE type. So, we don't need to check the type again.
- //read the corresponding log record to be undone.
- logRecord = logReader.read(undoLSN);
+ try {
+ logReader.initializeScan(firstLSN);
+ ILogRecord logRecord = null;
+ while (currentLSN < lastLSN) {
+ logRecord = logReader.next();
if (logRecord == null) {
- throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+ break;
+ } else {
+ currentLSN = logRecord.getLSN();
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
}
- if (IS_DEBUG_MODE) {
- LOGGER.info(logRecord.getLogRecordForDisplay());
+ logJobId = logRecord.getJobId();
+ if (logJobId != abortedJobId) {
+ continue;
}
- undo(logRecord);
- undoCount++;
+ tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize());
+ switch (logRecord.getLogType()) {
+ case LogType.UPDATE:
+ undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId);
+ if (undoLSNSet == null) {
+ loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+ undoLSNSet = new LinkedList<Long>();
+ jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet);
+ }
+ undoLSNSet.add(currentLSN);
+ updateLogCount++;
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ + tempKeyTxnId);
+ }
+ break;
+ case LogType.ENTITY_COMMIT:
+ jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
+ entityCommitLogCount++;
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ + tempKeyTxnId);
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+ case LogType.ABORT:
+ case LogType.FLUSH:
+ //ignore
+ break;
+ default:
+ throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ }
}
- }
- logReader.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" undone loser transaction's effect");
- LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
- + entityCommitLogCount + "/" + undoCount);
+ if (currentLSN != lastLSN) {
+ throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
+ + ") during abort( " + txnContext.getJobId() + ")");
+ }
+
+ //undo loserTxn's effect
+ LOGGER.log(Level.INFO, "undoing loser transaction's effect");
+
+ //TODO sort loser entities by smallest LSN to undo in one pass.
+ Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
+ int undoCount = 0;
+ while (iter.hasNext()) {
+ Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next();
+ undoLSNSet = loserEntity2LSNsMap.getValue();
+ for (long undoLSN : undoLSNSet) {
+ //here, all the log records are UPDATE type. So, we don't need to check the type again.
+ //read the corresponding log record to be undone.
+ logRecord = logReader.read(undoLSN);
+ if (logRecord == null) {
+ throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+ }
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
+ undo(logRecord);
+ undoCount++;
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("undone loser transaction's effect");
+ LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
+ + entityCommitLogCount + "/" + undoCount);
+ }
+ } finally {
+ logReader.close();
}
}
@@ -746,6 +784,175 @@
throw new IllegalStateException("Failed to redo", e);
}
}
+
+ private class JobEntityCommits {
+ private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
+ private final int jobId;
+ private final Set<TxnId> cachedEntityCommitTxns = new HashSet<TxnId>();
+ private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<File>();
+ //a flag indicating whether all the the commits for this jobs have been added.
+ private boolean preparedForSearch = false;
+ private TxnId winnerEntity = null;
+ private int currentPartitionSize = 0;
+ private long partitionMaxLSN = 0;
+ private String currentPartitonName;
+
+ public JobEntityCommits(int jobId) {
+ this.jobId = jobId;
+ }
+
+ public void add(ILogRecord logRecord) throws IOException {
+ if (preparedForSearch) {
+ throw new IOException("Cannot add new entity commits after preparing for search.");
+ }
+ winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+ cachedEntityCommitTxns.add(winnerEntity);
+ //since log file is read sequentially, LSNs are always increasing
+ partitionMaxLSN = logRecord.getLSN();
+ currentPartitionSize += winnerEntity.getCurrentSize();
+ //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory
+ if (currentPartitionSize >= MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE) {
+ spillToDiskAndfreeMemory();
+ }
+ }
+
+ public void spillToDiskAndfreeMemory() throws IOException {
+ if (cachedEntityCommitTxns.size() > 0) {
+ if (!preparedForSearch) {
+ writeCurrentPartitionToDisk();
+ }
+ cachedEntityCommitTxns.clear();
+ partitionMaxLSN = 0;
+ currentPartitionSize = 0;
+ currentPartitonName = "";
+ }
+ }
+
+ /**
+ * Call this method when no more entity commits will be added to this job.
+ *
+ * @throws IOException
+ */
+ public void prepareForSearch() throws IOException {
+ //if we have anything left in memory, we need to spill them to disk before searching other partitions.
+ //However, if we don't have anything on disk, we will search from memory only
+ if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) {
+ spillToDiskAndfreeMemory();
+ } else {
+ //set the name of the current in memory partition to the current partition
+ currentPartitonName = getPartitionName(partitionMaxLSN);
+ }
+ preparedForSearch = true;
+ }
+
+ public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException {
+ //if we don't have any partitions on disk, search only from memory
+ if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) {
+ return cachedEntityCommitTxns.contains(txnId);
+ } else {
+ //get candidate partitions from disk
+ ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN);
+ for (File partition : candidatePartitions) {
+ if (serachPartition(partition, txnId)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param logLSN
+ * @return partitions that have a max LSN > logLSN
+ */
+ public ArrayList<File> getCandidiatePartitions(long logLSN) {
+ ArrayList<File> candidiatePartitions = new ArrayList<File>();
+
+ for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
+ String partitionName = partition.getName();
+ //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN
+ if (getPartitionMaxLSNFromName(partitionName) > logLSN) {
+ candidiatePartitions.add(partition);
+ }
+ }
+
+ return candidiatePartitions;
+ }
+
+ public void clear() {
+ cachedEntityCommitTxns.clear();
+ for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
+ partition.delete();
+ }
+ jobEntitCommitOnDiskPartitionsFiles.clear();
+ }
+
+ private boolean serachPartition(File partition, TxnId txnId) throws IOException {
+ //load partition from disk if it is not already in memory
+ if (!partition.getName().equals(currentPartitonName)) {
+ loadPartitionToMemory(partition, cachedEntityCommitTxns);
+ currentPartitonName = partition.getName();
+ }
+ return cachedEntityCommitTxns.contains(txnId);
+ }
+
+ private String getPartitionName(long maxLSN) {
+ return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN;
+ }
+
+ private long getPartitionMaxLSNFromName(String partitionName) {
+ return Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR) + 1));
+ }
+
+ private void writeCurrentPartitionToDisk() throws IOException {
+ //if we don't have enough memory to allocate for this partition, we will ask recovery manager to free memory
+ if (needToFreeMemory()) {
+ freeJobsCachedEntities(jobId);
+ }
+ //allocate a buffer that can hold the current partition
+ ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize);
+ for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) {
+ TxnId txnId = iterator.next();
+ //serialize the object and remove it from memory
+ txnId.serialize(buffer);
+ iterator.remove();
+ }
+ //name partition file based on job id and max lsn
+ File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN));
+ //write file to disk
+ try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false);
+ FileChannel fileChannel = fileOutputstream.getChannel()) {
+ buffer.flip();
+ while (buffer.hasRemaining()) {
+ fileChannel.write(buffer);
+ }
+ }
+ jobEntitCommitOnDiskPartitionsFiles.add(partitionFile);
+ }
+
+ private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException {
+ partitionTxn.clear();
+ //if we don't have enough memory to a load partition, we will ask recovery manager to free memory
+ if (needToFreeMemory()) {
+ freeJobsCachedEntities(jobId);
+ }
+ ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length());
+ //load partition to memory
+ try (InputStream is = new FileInputStream(partition)) {
+ int readByte;
+ while ((readByte = is.read()) != -1) {
+ buffer.put((byte) readByte);
+ }
+ }
+ buffer.flip();
+ TxnId temp = null;
+ while (buffer.remaining() != 0) {
+ temp = TxnId.deserialize(buffer);
+ partitionTxn.add(temp);
+ }
+ }
+ }
}
class TxnId {
@@ -772,6 +979,9 @@
}
}
+ public TxnId() {
+ }
+
private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
int readOffset = pkValue.getFieldStart(0);
byte[] readBuffer = pkValue.getFieldData(0);
@@ -786,7 +996,7 @@
this.pkHashValue = pkHashValue;
this.tupleReferencePKValue = pkValue;
this.pkSize = pkSize;
- isByteArrayPKValue = false;
+ this.isByteArrayPKValue = false;
}
@Override
@@ -856,4 +1066,40 @@
}
return true;
}
-}
\ No newline at end of file
+
+ public void serialize(ByteBuffer buffer) throws IOException {
+ buffer.putInt(jobId);
+ buffer.putInt(datasetId);
+ buffer.putInt(pkHashValue);
+ buffer.putInt(pkSize);
+ buffer.put((byte) (isByteArrayPKValue ? 1 : 0));
+ if (isByteArrayPKValue) {
+ buffer.put(byteArrayPKValue);
+ }
+ }
+
+ public static TxnId deserialize(ByteBuffer buffer) {
+ TxnId txnId = new TxnId();
+ txnId.jobId = buffer.getInt();
+ txnId.datasetId = buffer.getInt();
+ txnId.pkHashValue = buffer.getInt();
+ txnId.pkSize = buffer.getInt();
+ txnId.isByteArrayPKValue = (buffer.get() == 1);
+ if (txnId.isByteArrayPKValue) {
+ byte[] byteArrayPKValue = new byte[txnId.pkSize];
+ buffer.get(byteArrayPKValue);
+ txnId.byteArrayPKValue = byteArrayPKValue;
+ }
+ return txnId;
+ }
+
+ public int getCurrentSize() {
+ //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
+ int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
+ //byte arraySize
+ if (isByteArrayPKValue && byteArrayPKValue != null) {
+ size += byteArrayPKValue.length;
+ }
+ return size;
+ }
+}