[NO ISSUE][TX] Ensure Uncommitted Atomic Txns Not Flushed
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Make all metadata indexes modifications as
force modifications.
- Do not decrement ops of atomic transactions
until they fully commit or abort to prevent
flushing partial records.
- Do not schedule flush if a force modification
starts before the flush log is written to disk.
- Unify code path for completing operations
after commit/abort in op tracker.
- Remove unneeded update log commit notification.
- Add test case for failing flush due to force
modification.
Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2456
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 70e5f6e..0f6adf6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -19,8 +19,10 @@
package org.apache.asterix.test.metadata;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -31,19 +33,25 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.IMetadataIndex;
import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -245,6 +253,68 @@
}
}
+ @Test
+ public void failedFlushOnUncommittedMetadataTxn() throws Exception {
+ ICcApplicationContext ccAppCtx =
+ (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null);
+ final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxn);
+ final String nodeGroupName = "ng";
+ try {
+ final List<String> ngNodes = Collections.singletonList("asterix_nc1");
+ MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes));
+ MetadataManager.INSTANCE.commitTransaction(mdTxn);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ INcApplicationContext appCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ IDatasetLifecycleManager dlcm = appCtx.getDatasetLifecycleManager();
+ dlcm.flushAllDatasets();
+ IMetadataIndex idx = MetadataPrimaryIndexes.NODEGROUP_DATASET;
+ DatasetInfo datasetInfo = dlcm.getDatasetInfo(idx.getDatasetId().getId());
+ AbstractLSMIndex index = (AbstractLSMIndex) appCtx.getDatasetLifecycleManager()
+ .getIndex(idx.getDatasetId().getId(), idx.getResourceId());
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+ final MetadataTransactionContext mdTxn2 = MetadataManager.INSTANCE.beginTransaction();
+ int mutableComponentBeforeFlush = index.getCurrentMemoryComponentIndex();
+ int diskComponentsBeforeFlush = index.getDiskComponents().size();
+ // lock opTracker to prevent log flusher from triggering flush
+ synchronized (opTracker) {
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ Assert.assertTrue(opTracker.isFlushLogCreated());
+ metadataProvider.setMetadataTxnContext(mdTxn2);
+ // make sure force operation will processed
+ MetadataManager.INSTANCE.dropNodegroup(mdTxn2, nodeGroupName, false);
+ Assert.assertEquals(1, opTracker.getNumActiveOperations());
+ Assert.assertFalse(index.hasFlushRequestForCurrentMutableComponent());
+ // release opTracker lock now to allow log flusher to schedule the flush
+ InvokeUtil.runWithTimeout(() -> {
+ synchronized (opTracker) {
+ opTracker.wait(1000);
+ }
+ }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS);
+ }
+ // ensure flush failed to be scheduled
+ datasetInfo.waitForIO();
+ Assert.assertEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex());
+ Assert.assertEquals(diskComponentsBeforeFlush, index.getDiskComponents().size());
+ // after committing, the flush should be scheduled successfully
+ opTracker.setFlushOnExit(true);
+ MetadataManager.INSTANCE.commitTransaction(mdTxn2);
+ metadataProvider.getLocks().unlock();
+ InvokeUtil.runWithTimeout(() -> {
+ synchronized (opTracker) {
+ opTracker.wait(1000);
+ }
+ }, () -> !opTracker.isFlushLogCreated(), 10, TimeUnit.SECONDS);
+ // ensure flush completed successfully and the component was switched
+ datasetInfo.waitForIO();
+ Assert.assertNotEquals(mutableComponentBeforeFlush, index.getCurrentMemoryComponentIndex());
+ Assert.assertEquals(diskComponentsBeforeFlush + 1, index.getDiskComponents().size());
+ }
+
private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort)
throws Exception {
Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 3886115..47f7ae8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
@@ -95,7 +96,7 @@
}
public synchronized void flushIfNeeded() throws HyracksDataException {
- if (numActiveOperations.get() == 0) {
+ if (canSafelyFlush()) {
flushIfRequested();
}
}
@@ -117,7 +118,8 @@
}
if (needsFlush || flushOnExit) {
- //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled.
+ // make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
+ // them until the current flush is scheduled.
LSMComponentId primaryId = null;
for (ILSMIndex lsmIndex : indexes) {
ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
@@ -137,7 +139,7 @@
LogRecord logRecord = new LogRecord();
flushOnExit = false;
if (dsInfo.isDurable()) {
- /**
+ /*
* Generate a FLUSH log.
* Flush will be triggered when the log is written to disk by LogFlusher.
*/
@@ -158,18 +160,30 @@
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
- idGenerator.refresh();
- for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
- //get resource
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- //update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
- ioOpCallback.updateLastLSN(logRecord.getLSN());
- //schedule flush after update
- accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
+ try {
+ if (!canSafelyFlush()) {
+ // if a force modification operation started before the flush is scheduled, this flush will fail
+ // and a next attempt will be made when that operation completes. This is only expected for metadata
+ // datasets since they always use force modification
+ if (MetadataIndexImmutableProperties.isMetadataDataset(datasetID)) {
+ return;
+ }
+ throw new IllegalStateException("Operation started while index was pending scheduling a flush");
+ }
+ idGenerator.refresh();
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+ //get resource
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback =
+ (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+ //schedule flush after update
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
+ }
+ } finally {
+ flushLogCreated = false;
}
- flushLogCreated = false;
}
public int getNumActiveOperations() {
@@ -194,14 +208,6 @@
}
}
- public void cleanupNumActiveOperationsForAbortedJob(int numberOfActiveOperations) {
- numberOfActiveOperations *= -1;
- numActiveOperations.getAndAdd(numberOfActiveOperations);
- if (numActiveOperations.get() < 0) {
- throw new IllegalStateException("The number of active operations cannot be negative!");
- }
- }
-
public boolean isFlushOnExit() {
return flushOnExit;
}
@@ -218,4 +224,7 @@
return partition;
}
+ private boolean canSafelyFlush() {
+ return numActiveOperations.get() == 0;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index a3d5bc5..940535f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -23,9 +23,8 @@
/**
* A typical transaction lifecycle goes through the following steps:
- * 1. {@link ITransactionContext#register(long, ILSMIndex, IModificationOperationCallback, boolean)}
+ * 1. {@link ITransactionContext#register(long, int, ILSMIndex, IModificationOperationCallback, boolean)}
* 2. {@link ITransactionContext#beforeOperation(long)}
- * 3. {@link ITransactionContext#notifyUpdateCommitted(long)}
* 4. {@link ITransactionContext#notifyEntityCommitted}
* 5. {@link ITransactionContext#afterOperation(long)}
* 6. {@link ITransactionContext#complete()}
@@ -125,15 +124,6 @@
void beforeOperation(long resourceId);
/**
- * Called to notify the transaction that an update log belonging
- * to this transaction on index with {@code resourceId} has been
- * flushed to disk.
- *
- * @param resourceId
- */
- void notifyUpdateCommitted(long resourceId);
-
- /**
* Called to notify the transaction that an entity commit
* log belonging to this transaction has been flushed to
* disk.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index f81d7da..616d92b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -490,19 +490,17 @@
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
switch (op) {
case INSERT:
- indexAccessor.insert(tuple);
+ indexAccessor.forceInsert(tuple);
break;
case DELETE:
- indexAccessor.delete(tuple);
+ indexAccessor.forceDelete(tuple);
break;
case UPSERT:
- indexAccessor.upsert(tuple);
+ indexAccessor.forceUpsert(tuple);
break;
default:
throw new IllegalStateException("Unknown operation type: " + op);
}
- PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
- opTracker.flushIfNeeded(); // there is a window where the flush is not triggerred after an operation
} finally {
datasetLifecycleManager.close(resourceName);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index a630caa..21268e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -226,10 +226,6 @@
if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
txnSubsystem.incrementEntityCommitCount();
}
- } else if (logRecord.getLogType() == LogType.UPDATE) {
- reusableTxnId.setId(logRecord.getTxnId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
- txnCtx.notifyUpdateCommitted(logRecord.getResourceId());
} else if (logRecord.getLogType() == LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT) {
notifyJobTermination();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index b3d5e49..95cabf9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -107,8 +107,8 @@
@Override
public void complete() {
try {
- if (txnState.get() == ITransactionManager.ABORTED) {
- cleanupForAbort();
+ if (isWriteTxn()) {
+ cleanup();
}
} finally {
synchronized (txnOpTrackers) {
@@ -141,5 +141,5 @@
return sb.toString();
}
- protected abstract void cleanupForAbort();
+ protected abstract void cleanup();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 219cf07..079e99a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -22,8 +22,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -57,16 +57,6 @@
}
@Override
- public void notifyUpdateCommitted(long resourceId) {
- try {
- opTrackers.get(resourceId).completeOperation(null, LSMOperationType.MODIFICATION, null,
- callbacks.get(resourceId));
- } catch (HyracksDataException e) {
- throw new ACIDException(e);
- }
- }
-
- @Override
public void notifyEntityCommitted(int partition) {
throw new IllegalStateException("Unexpected entity commit in atomic transaction");
}
@@ -82,10 +72,26 @@
}
@Override
- public void cleanupForAbort() {
- // each opTracker should be cleaned
- opTrackers.forEach((resId, opTracker) -> ((PrimaryIndexOperationTracker) opTracker)
- .cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get()));
+ public void cleanup() {
+ switch (getTxnState()) {
+ case ITransactionManager.ABORTED:
+ case ITransactionManager.COMMITTED:
+ for (Map.Entry<Long, ILSMOperationTracker> opTracker : opTrackers.entrySet()) {
+ try {
+ final long resId = opTracker.getKey();
+ final int idxPendingOps = indexPendingOps.get(resId).intValue();
+ for (int i = 0; i < idxPendingOps; i++) {
+ opTracker.getValue().completeOperation(null, LSMOperationType.FORCE_MODIFICATION, null,
+ callbacks.get(resId));
+ }
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException("invalid state in txn clean up: " + getTxnState());
+ }
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9d2f54b..9fcb08b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -20,11 +20,11 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -56,8 +56,7 @@
resourcePendingOps.put(resourceId, pendingOps);
if (primaryIndex) {
Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
- new Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>(
- (PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+ new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
primaryIndexTrackers.put(partition, pair);
}
}
@@ -69,11 +68,6 @@
}
@Override
- public void notifyUpdateCommitted(long resourceId) {
- // no op
- }
-
- @Override
public void notifyEntityCommitted(int partition) {
try {
Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
@@ -90,11 +84,18 @@
}
@Override
- protected void cleanupForAbort() {
- for (Entry<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> e : primaryIndexTrackers
- .entrySet()) {
- AtomicInteger pendingOps = partitionPendingOps.get(e.getKey());
- e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+ protected void cleanup() {
+ if (getTxnState() == ITransactionManager.ABORTED) {
+ primaryIndexTrackers.forEach((partitionId, opTracker) -> {
+ int pendingOps = partitionPendingOps.get(partitionId).intValue();
+ for (int i = 0; i < pendingOps; i++) {
+ try {
+ opTracker.first.completeOperation(null, LSMOperationType.MODIFICATION, null, opTracker.second);
+ } catch (HyracksDataException ex) {
+ throw new ACIDException(ex);
+ }
+ }
+ });
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index c60e673..fb2bdeb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -22,6 +22,8 @@
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.IOInterruptibleAction;
@@ -234,4 +236,20 @@
}
}
}
+
+ /**
+ * Runs the supplied {@code action} until {@code stopCondition} is met or timeout.
+ */
+ public static void runWithTimeout(ThrowingAction action, BooleanSupplier stopCondition, long timeout, TimeUnit unit)
+ throws Exception {
+ long remainingTime = unit.toNanos(timeout);
+ final long startTime = System.nanoTime();
+ while (!stopCondition.getAsBoolean()) {
+ if (remainingTime <= 0) {
+ throw new TimeoutException("Stop condition was not met after " + unit.toSeconds(timeout) + " seconds.");
+ }
+ action.run();
+ remainingTime -= System.nanoTime() - startTime;
+ }
+ }
}