[ASTERIXDB-3455][RT] Delete statement stuck indefinitely
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Use LSMPrimaryInsertNodePushable instead of
LSMInsertDeleteOperatorNodePushable for deletes.
Change-Id: I07c61009bd7bdce507151db7dccca195a6544c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18465
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 668ff13..8aa25ad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -275,7 +275,7 @@
ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, pkIndexHelperFactory,
primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, modOpCallbackFactory,
searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null, tuplePartitionerFactory,
- partitionsMap);
+ partitionsMap, IndexOperation.UPSERT);
// For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
// for the index, we will have to create an assign operator that extract the sk
// then the secondary LSMInsertDeleteOperatorNodePushable
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a73ec9..023b01c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -157,7 +157,6 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
@@ -1138,28 +1137,28 @@
BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
- if (indexOp == IndexOperation.INSERT) {
- ISearchOperationCallbackFactory searchCallbackFactory = dataset
- .getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory =
+ dataset.getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
- Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
- dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()).stream()
- .filter(Index::isPrimaryKeyIndex).findFirst();
- IIndexDataflowHelperFactory pkidfh = null;
- if (primaryKeyIndex.isPresent()) {
- PartitioningProperties idxPartitioningProperties =
- getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
- pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
- idxPartitioningProperties.getSplitsProvider());
- }
+ Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+ dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()).stream()
+ .filter(Index::isPrimaryKeyIndex).findFirst();
+ IIndexDataflowHelperFactory pkidfh = null;
+ if (primaryKeyIndex.isPresent()) {
+ PartitioningProperties idxPartitioningProperties =
+ getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
+ pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+ idxPartitioningProperties.getSplitsProvider());
+ }
+ if (indexOp == IndexOperation.INSERT) {
op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
- partitioningProperties.getComputeStorageMap());
+ partitioningProperties.getComputeStorageMap(), IndexOperation.UPSERT);
} else {
- op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
- null, true, modificationCallbackFactory, partitionerFactory,
- partitioningProperties.getComputeStorageMap());
+ op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+ modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
+ partitioningProperties.getComputeStorageMap(), IndexOperation.DELETE);
}
}
return new Pair<>(op, partitioningProperties.getConstraints());
@@ -1169,20 +1168,11 @@
RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, IndexOperation op) {
// this can be used by extensions to pick up their own operators
return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, tuplePartitionerFactory,
- partitionsMap);
- }
-
- protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
- IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
- IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
- IModificationOperationCallbackFactory modCallbackFactory, ITuplePartitionerFactory tuplePartitionerFactory,
- int[][] partitionsMap) {
- return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
- tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
+ partitionsMap, op);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexModificationRuntime(IndexOperation indexOp,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index f46e172..21f79da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -45,9 +45,9 @@
IIndexDataflowHelperFactory keyIndexHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
- super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
- modificationOpCallbackFactory, tuplePartitionerFactory, partitionsMap);
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, IndexOperation op) {
+ super(spec, outRecDesc, fieldPermutation, op, indexHelperFactory, null, true, modificationOpCallbackFactory,
+ tuplePartitionerFactory, partitionsMap);
this.keyIndexHelperFactory = keyIndexHelperFactory;
this.searchOpCallbackFactory = searchOpCallbackFactory;
this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -60,6 +60,6 @@
RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMPrimaryInsertOperatorNodePushable(ctx, partition, indexHelperFactory, keyIndexHelperFactory,
fieldPermutation, intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys,
- filterFields, sourceLoc, tuplePartitionerFactory, partitionsMap);
+ filterFields, sourceLoc, tuplePartitionerFactory, partitionsMap, op);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7fb3369..072a9b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -107,10 +107,10 @@
int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
- SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap)
- throws HyracksDataException {
- super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
- modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+ SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap,
+ IndexOperation op) throws HyracksDataException {
+ super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory, null,
+ tuplePartitionerFactory, partitionsMap);
this.sourceLoc = sourceLoc;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
this.searchCallbacks = new ISearchOperationCallback[partitions.length];
@@ -160,47 +160,59 @@
// already processed; skip
return;
}
- keyTuple.reset(accessor, index);
- searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
- boolean duplicate = false;
+ switch (op) {
+ case INSERT:
+ case UPSERT:
+ keyTuple.reset(accessor, index);
+ searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
+ boolean duplicate = false;
- lsmAccessorForUniqunessCheck.search(cursor, searchPred);
- try {
- if (cursor.hasNext()) {
- // duplicate, skip
- if (searchCallback instanceof LockThenSearchOperationCallback) {
- ((LockThenSearchOperationCallback) searchCallback).release();
+ lsmAccessorForUniqunessCheck.search(cursor, searchPred);
+ try {
+ if (cursor.hasNext()) {
+ // duplicate, skip
+ if (searchCallback instanceof LockThenSearchOperationCallback) {
+ ((LockThenSearchOperationCallback) searchCallback).release();
+ }
+ duplicate = true;
+ }
+ } finally {
+ cursor.close();
}
- duplicate = true;
- }
- } finally {
- cursor.close();
- }
- if (!duplicate) {
- beforeModification(tuple);
- ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
- if (lsmAccessorForKeyIndex != null) {
- lsmAccessorForKeyIndex.forceUpsert(keyTuple);
- }
- } else {
- // we should flush previous inserted records so that these transactions can commit
- flushPartialFrame();
- // feed requires this nested exception to remove duplicated tuples
- // TODO: a better way to treat duplicates?
- throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
- HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+ if (!duplicate) {
+ beforeModification(tuple);
+ ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
+ if (lsmAccessorForKeyIndex != null) {
+ lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+ }
+ } else {
+ // we should flush previous inserted records so that these transactions can commit
+ flushPartialFrame();
+ // feed requires this nested exception to remove duplicated tuples
+ // TODO: a better way to treat duplicates?
+ throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+ HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+ }
+ break;
+ case DELETE:
+ ((ILSMIndexAccessor) indexAccessor).forceDelete(tuple);
+ break;
+ default:
+ throw HyracksDataException.create(ErrorCode.INVALID_OPERATOR_OPERATION, sourceLoc,
+ op.toString(), LSMPrimaryInsertOperatorNodePushable.class.getSimpleName());
+
}
processedTuples.add(index);
}
@Override
public void start() throws HyracksDataException {
- ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+ ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(op);
}
@Override
public void finish() throws HyracksDataException {
- ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+ ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(op);
}
@Override