[ASTERIXDB-2301][TX] Fix Abort of DELETE operation
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Fix the undo logic of the DELETE operation. Previously undo
of DELETE is implemented as re-inserting the old value. However,
if the deleted record is from the disk component, the old value could be
null. In this case, we simply need to physically deleted the anti-matter
key from the memory component, exactly the same as undo upsert.
- Add test cases for undo.
Change-Id: I5002d639399f024be8837da1c539101e6d62a159
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2432
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index a8d8610..e23be96 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -730,15 +730,19 @@
(ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
- indexAccessor.forceDelete(logRecord.getNewValue());
- } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
- indexAccessor.forceInsert(logRecord.getOldValue());
- } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
- // undo, upsert the old value if found, otherwise, physical delete
- undoUpsert(indexAccessor, logRecord);
- } else {
- throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
+ switch (logRecord.getNewOp()) {
+ case AbstractIndexModificationOperationCallback.INSERT_BYTE:
+ indexAccessor.forceDelete(logRecord.getNewValue());
+ break;
+ case AbstractIndexModificationOperationCallback.DELETE_BYTE:
+ // use the same logic to undo delete as undo upsert, since
+ // the old value could be null as well if the deleted record is from disk component
+ case AbstractIndexModificationOperationCallback.UPSERT_BYTE:
+ // undo, upsert the old value if found, otherwise, physical delete
+ undoUpsertOrDelete(indexAccessor, logRecord);
+ break;
+ default:
+ throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
} finally {
indexAccessor.destroy();
@@ -748,7 +752,8 @@
}
}
- private static void undoUpsert(ILSMIndexAccessor indexAccessor, ILogRecord logRecord) throws HyracksDataException {
+ private static void undoUpsertOrDelete(ILSMIndexAccessor indexAccessor, ILogRecord logRecord)
+ throws HyracksDataException {
if (logRecord.getOldValue() == null) {
try {
indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f99429c..e8e38eb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -189,6 +189,15 @@
Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider, Index secondaryIndex)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return getInsertPipeline(ctx, dataset, primaryKeyTypes, recordType, metaType, filterFields, primaryKeyIndexes,
+ primaryKeyIndicators, storageComponentProvider, secondaryIndex, IndexOperation.INSERT);
+ }
+
+ public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+ Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
+ int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+ StorageComponentProvider storageComponentProvider, Index secondaryIndex, IndexOperation op)
throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
CcApplicationContext appCtx =
(CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
@@ -200,7 +209,6 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
- IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index d9f2e20..b2b03b3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -57,6 +57,7 @@
import org.apache.hyracks.api.test.CountAnswer;
import org.apache.hyracks.api.test.FrameWriterTestUtils;
import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -121,6 +122,13 @@
}
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Index secondaryIndex, IndexOperation op)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+ }
+
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
new file mode 100644
index 0000000..06b213d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.nio.file.Paths;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.test.base.TestMethodTracer;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+public class TransactionAbortTest {
+ private static TestNodeController nc;
+ private static TestLsmBtree lsmBtree;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+ private static IHyracksTaskContext ctx;
+ private static IIndexDataflowHelper indexDataflowHelper;
+ private static final int PARTITION = 0;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+ private static int NUM_INSERT_RECORDS = 1000;
+ private static ITransactionContext txnCtx;
+
+ private static IHyracksTaskContext abortCtx;
+ private static ITransactionContext abortTxnCtx;
+ private static LSMInsertDeleteOperatorNodePushable abortOp;
+ private static TupleGenerator tupleGenerator;
+
+ @Rule
+ public TestRule watcher = new TestMethodTracer();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = Paths.get(System.getProperty("user.dir"), "src", "test", "resources", "cc.conf").toString();
+ nc = new TestNodeController(configPath, false);
+ nc.init();
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Before
+ public void createIndex() throws Exception {
+ PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ JobId jobId = nc.newJobId();
+ ctx = nc.createTestContext(jobId, PARTITION, false);
+ indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+ indexDataflowHelper.open();
+ lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+ indexDataflowHelper.close();
+
+ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, null);
+
+ JobId abortJobId = nc.newJobId();
+ abortCtx = nc.createTestContext(abortJobId, PARTITION, false);
+ abortTxnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(abortCtx),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ // abortOp is initialized by each test separately
+ tupleGenerator = StorageTestUtils.getTupleGenerator();
+ }
+
+ @Test
+ public void testAbortDeleteFromDiskComponent() throws Exception {
+ ITupleReference lastTuple = insertRecords(NUM_INSERT_RECORDS);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
+
+ Assert.assertEquals(1, lsmBtree.getDiskComponents().size());
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+
+ abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.DELETE);
+ testAbort(lastTuple);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+ }
+
+ @Test
+ public void testAbortDeleteFromMemoryComponent() throws Exception {
+ ITupleReference lastTuple = insertRecords(NUM_INSERT_RECORDS);
+ Assert.assertEquals(0, lsmBtree.getDiskComponents().size());
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+
+ abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.DELETE);
+ testAbort(lastTuple);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+ }
+
+ @Test
+ public void testAbortInsert() throws Exception {
+ insertRecords(NUM_INSERT_RECORDS);
+ Assert.assertEquals(0, lsmBtree.getDiskComponents().size());
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+
+ abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.INSERT);
+ testAbort(tupleGenerator.next());
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
+ }
+
+ @After
+ public void destroyIndex() throws Exception {
+ indexDataflowHelper.destroy();
+ }
+
+ private ITupleReference insertRecords(int numRecords) throws Exception {
+ StorageTestUtils.allowAllOps(lsmBtree);
+ insertOp.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ ITupleReference tuple = null;
+ for (int i = 0; i < numRecords; i++) {
+ tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ return tuple;
+ }
+
+ private void testAbort(ITupleReference tuple) throws Exception {
+ setFailModificationCallback(lsmBtree);
+ abortOp.open();
+ boolean aborted = false;
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ try {
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, abortOp);
+ tupleAppender.write(abortOp, true);
+ } catch (HyracksDataException e) {
+ StorageTestUtils.allowAllOps(lsmBtree);
+ nc.getTransactionManager().abortTransaction(abortTxnCtx.getTxnId());
+ aborted = true;
+ } finally {
+ abortOp.close();
+ }
+ Assert.assertTrue(aborted);
+ }
+
+ private void setFailModificationCallback(TestLsmBtree index) {
+ index.clearModifyCallbacks();
+ index.addModifyCallback(new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ t.release();
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+ // manually set the current memory component as modified
+ index.getCurrentMemoryComponent().setModified();
+ throw new HyracksDataException("Fail the job");
+ }
+ });
+ }
+
+}