[ASTERIXDB-1708][TX] Prevent log deletion during scan
Right now there is a potential for a soft checkpoint to delete a
log file that is about to be read as part of a transaction rollback.
This patch stops the soft checkpoint from proceeding if a rollback
is about to take place and vice-versa.
Change-Id: Icff1a520af24c8fac8e5836cdbf46425b78b1260
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2508
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4a2cf2d..4b14a9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -59,6 +59,7 @@
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -104,6 +105,7 @@
private SystemState state;
private final INCServiceContext serviceCtx;
private final INcApplicationContext appCtx;
+ private static final TxnId recoveryTxnId = new TxnId(-1);
public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
this.serviceCtx = serviceCtx;
@@ -505,21 +507,24 @@
}
@Override
- public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
- long minLSN = getPartitionsMinLSN(partitions);
- long readableSmallestLSN = logMgr.getReadableSmallestLSN();
- if (minLSN < readableSmallestLSN) {
- minLSN = readableSmallestLSN;
- }
-
+ public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
+ throws HyracksDataException {
//replay logs > minLSN that belong to these partitions
try {
+ checkpointManager.secure(recoveryTxnId);
+ long minLSN = getPartitionsMinLSN(partitions);
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ if (minLSN < readableSmallestLSN) {
+ minLSN = readableSmallestLSN;
+ }
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
if (flush) {
appCtx.getDatasetLifecycleManager().flushAllDatasets();
}
} catch (IOException | ACIDException e) {
throw HyracksDataException.create(e);
+ } finally {
+ checkpointManager.completed(recoveryTxnId);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 91d98e5..418282e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -28,16 +28,19 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.data.gen.TupleGenerator;
import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.RecoveryManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -50,20 +53,23 @@
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
public class CheckpointingTest {
@@ -88,6 +94,8 @@
private static final String DATASET_NAME = "TestDS";
private static final String DATA_TYPE_NAME = "DUMMY";
private static final String NODE_GROUP_NAME = "DEFAULT";
+ private volatile boolean threadException = false;
+ private Throwable exception = null;
@Before
public void setUp() throws Exception {
@@ -128,7 +136,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+ RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager();
ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
@@ -178,20 +186,70 @@
/*
* At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
- * a checkpoint should delete it.
+ * a checkpoint should delete it. We will also start a second
+ * job to ensure that the checkpointing coexists peacefully
+ * with other concurrent readers of the log that request
+ * deletions to be witheld
*/
- checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
- // Validate initialLowWaterMarkFileId was deleted
- for (Long fileId : logManager.getLogFileIds()) {
- Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
- }
+ JobId jobId2 = nc.newJobId();
+ IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false);
+ nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ // Prepare insert operation
+ LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ insertOp2.open();
+ VSizeFrame frame2 = new VSizeFrame(ctx2);
+ FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
+ for (int i = 0; i < 4; i++) {
+ long lastCkpoint = recoveryManager.getMinFirstLSN();
+ long lastFileId = logManager.getLogFileId(lastCkpoint);
- if (tupleAppender.getTupleCount() > 0) {
- tupleAppender.write(insertOp, true);
+ checkpointManager.tryCheckpoint(lowWaterMarkLSN);
+ // Validate initialLowWaterMarkFileId was deleted
+ for (Long fileId : logManager.getLogFileIds()) {
+ Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+ }
+
+ while (currentLowWaterMarkLogFileId == lastFileId) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2);
+ lowWaterMarkLSN = recoveryManager.getMinFirstLSN();
+ currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+ }
}
- insertOp.close();
- nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread th, Throwable ex) {
+ threadException = true;
+ exception = ex;
+ }
+ };
+
+ Thread t = new Thread(() -> {
+ TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager());
+ doAnswer((Answer) i -> {
+ stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(),
+ (TxnId) i.getArguments()[0]);
+ return null;
+ }).when(spyTxnMgr).abortTransaction(any(TxnId.class));
+
+ spyTxnMgr.abortTransaction(txnCtx.getTxnId());
+ });
+ t.setUncaughtExceptionHandler(h);
+ synchronized (t) {
+ t.start();
+ t.wait();
+ }
+ long lockedLSN = recoveryManager.getMinFirstLSN();
+ checkpointManager.tryCheckpoint(lockedLSN);
+ synchronized (t) {
+ t.notifyAll();
+ }
+ t.join();
+ if (threadException) {
+ throw exception;
+ }
} finally {
nc.deInit();
}
@@ -201,6 +259,32 @@
}
}
+ private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId)
+ throws InterruptedException, HyracksDataException {
+
+ try {
+ if (txnCtx.isWriteTxn()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
+ synchronized (t) {
+ t.notifyAll();
+ t.wait();
+ }
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ txnCtx.setTxnState(ITransactionManager.ABORTED);
+ }
+ } catch (ACIDException | HyracksDataException e) {
+ String msg = "Could not complete rollback! System is in an inconsistent state";
+ throw new ACIDException(msg, e);
+ } finally {
+ txnCtx.complete();
+ txnSubsystem.getLockManager().releaseLocks(txnCtx);
+ txnSubsystem.getCheckpointManager().completed(txnId);
+ }
+ }
+
@Test
public void testCorruptedCheckpointFiles() {
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 9e7eb0d..e3cf8b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -46,4 +46,19 @@
* @throws HyracksDataException
*/
long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
-}
\ No newline at end of file
+
+ /**
+ * Secures the current low-water mark until the transaction identified by {@code id} completes.
+ *
+ * @param id
+ * @throws HyracksDataException
+ */
+ void secure(TxnId id) throws HyracksDataException;
+
+ /**
+ * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed.
+ *
+ * @param id
+ */
+ void completed(TxnId id);
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 3ada608..736de07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -76,7 +76,6 @@
private final String logFilePrefix;
private final MutableLong flushLSN;
private final String nodeId;
- private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
private final long logFileSize;
private final int logPageSize;
private final AtomicLong appendLSN;
@@ -407,24 +406,20 @@
/**
* At this point, any future LogReader should read from LSN >= checkpointLSN
*/
- synchronized (txnLogFileId2ReaderCount) {
- for (Long id : logFileIds) {
- /**
- * Stop deletion if:
- * The log file which contains the checkpointLSN has been reached.
- * The oldest log file being accessed by a LogReader has been reached.
- */
- if (id >= checkpointLSNLogFileID
- || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
- break;
- }
- //delete old log file
- File file = new File(getLogFilePath(id));
- file.delete();
- txnLogFileId2ReaderCount.remove(id);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Deleted log file " + file.getAbsolutePath());
- }
+ for (Long id : logFileIds) {
+ /**
+ * Stop deletion if:
+ * The log file which contains the checkpointLSN has been reached.
+ * The oldest log file being accessed by a LogReader has been reached.
+ */
+ if (id >= checkpointLSNLogFileID) {
+ break;
+ }
+ //delete old log file
+ File file = new File(getLogFilePath(id));
+ file.delete();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Deleted log file " + file.getAbsolutePath());
}
}
}
@@ -450,7 +445,6 @@
}
private long deleteAllLogFiles() {
- txnLogFileId2ReaderCount.clear();
List<Long> logFileIds = getLogFileIds();
if (!logFileIds.isEmpty()) {
for (Long id : logFileIds) {
@@ -607,7 +601,6 @@
RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
FileChannel newFileChannel = raf.getChannel();
TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
- touchLogFile(fileId);
return logFile;
}
@@ -617,32 +610,6 @@
LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
}
fileChannel.close();
- untouchLogFile(logFileRef.getLogFileId());
- }
-
- private void touchLogFile(long fileId) {
- synchronized (txnLogFileId2ReaderCount) {
- if (txnLogFileId2ReaderCount.containsKey(fileId)) {
- txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
- } else {
- txnLogFileId2ReaderCount.put(fileId, 1);
- }
- }
- }
-
- private void untouchLogFile(long fileId) {
- synchronized (txnLogFileId2ReaderCount) {
- if (txnLogFileId2ReaderCount.containsKey(fileId)) {
- int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
- if (newReaderCount < 0) {
- throw new IllegalStateException(
- "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
- }
- txnLogFileId2ReaderCount.put(fileId, newReaderCount);
- } else {
- throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
- }
- }
}
/**
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index a541bd9..6efd0e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -22,10 +22,15 @@
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* An implementation of {@link ICheckpointManager} that defines the logic
* of checkpoints.
@@ -33,9 +38,12 @@
public class CheckpointManager extends AbstractCheckpointManager {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final long NO_SECURED_LSN = -1l;
+ private final Map<TxnId, Long> securedLSNs;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
+ securedLSNs = new HashMap<>();
}
/**
@@ -62,6 +70,10 @@
@Override
public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
LOGGER.info("Attemping soft checkpoint...");
+ final long minSecuredLSN = getMinSecuredLSN();
+ if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) {
+ return minSecuredLSN;
+ }
final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
if (!checkpointSucceeded) {
@@ -77,4 +89,18 @@
}
return minFirstLSN;
}
-}
\ No newline at end of file
+
+ @Override
+ public synchronized void secure(TxnId id) throws HyracksDataException {
+ securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN());
+ }
+
+ @Override
+ public synchronized void completed(TxnId id) {
+ securedLSNs.remove(id);
+ }
+
+ private synchronized long getMinSecuredLSN() {
+ return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 76ecc63..c218dec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
@@ -103,10 +104,11 @@
LogRecord logRecord = new LogRecord();
TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
- } catch (ACIDException e) {
+ } catch (HyracksDataException e) {
String msg = "Could not complete rollback! System is in an inconsistent state";
if (LOGGER.isErrorEnabled()) {
LOGGER.log(Level.ERROR, msg, e);
@@ -116,6 +118,7 @@
txnCtx.complete();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
txnCtxRepository.remove(txnCtx.getTxnId());
+ txnSubsystem.getCheckpointManager().completed(txnId);
}
}