[ASTERIXDB-3455][RT] Delete statement stuck indefinitely

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Use LSMPrimaryInsertNodePushable instead of
LSMInsertDeleteOperatorNodePushable for deletes.

Change-Id: I07c61009bd7bdce507151db7dccca195a6544c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18465
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 668ff13..8aa25ad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -275,7 +275,7 @@
                     ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, pkIndexHelperFactory,
                     primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, modOpCallbackFactory,
                     searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null, tuplePartitionerFactory,
-                    partitionsMap);
+                    partitionsMap, IndexOperation.UPSERT);
             // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
             // for the index, we will have to create an assign operator that extract the sk
             // then the secondary LSMInsertDeleteOperatorNodePushable
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a73ec9..023b01c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -157,7 +157,6 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -1138,28 +1137,28 @@
                     BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
                     partitioningProperties.getComputeStorageMap());
         } else {
-            if (indexOp == IndexOperation.INSERT) {
-                ISearchOperationCallbackFactory searchCallbackFactory = dataset
-                        .getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory =
+                    dataset.getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
 
-                Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
-                        dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()).stream()
-                        .filter(Index::isPrimaryKeyIndex).findFirst();
-                IIndexDataflowHelperFactory pkidfh = null;
-                if (primaryKeyIndex.isPresent()) {
-                    PartitioningProperties idxPartitioningProperties =
-                            getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
-                    pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
-                            idxPartitioningProperties.getSplitsProvider());
-                }
+            Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                    dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()).stream()
+                    .filter(Index::isPrimaryKeyIndex).findFirst();
+            IIndexDataflowHelperFactory pkidfh = null;
+            if (primaryKeyIndex.isPresent()) {
+                PartitioningProperties idxPartitioningProperties =
+                        getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
+                pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                        idxPartitioningProperties.getSplitsProvider());
+            }
+            if (indexOp == IndexOperation.INSERT) {
                 op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
-                        partitioningProperties.getComputeStorageMap());
+                        partitioningProperties.getComputeStorageMap(), IndexOperation.UPSERT);
 
             } else {
-                op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        null, true, modificationCallbackFactory, partitionerFactory,
-                        partitioningProperties.getComputeStorageMap());
+                op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
+                        partitioningProperties.getComputeStorageMap(), IndexOperation.DELETE);
             }
         }
         return new Pair<>(op, partitioningProperties.getConstraints());
@@ -1169,20 +1168,11 @@
             RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
             IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, IndexOperation op) {
         // this can be used by extensions to pick up their own operators
         return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
                 modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, tuplePartitionerFactory,
-                partitionsMap);
-    }
-
-    protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
-            IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
-            IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
-            IModificationOperationCallbackFactory modCallbackFactory, ITuplePartitionerFactory tuplePartitionerFactory,
-            int[][] partitionsMap) {
-        return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
-                tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
+                partitionsMap, op);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexModificationRuntime(IndexOperation indexOp,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index f46e172..21f79da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -45,9 +45,9 @@
             IIndexDataflowHelperFactory keyIndexHelperFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
-        super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
-                modificationOpCallbackFactory, tuplePartitionerFactory, partitionsMap);
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap, IndexOperation op) {
+        super(spec, outRecDesc, fieldPermutation, op, indexHelperFactory, null, true, modificationOpCallbackFactory,
+                tuplePartitionerFactory, partitionsMap);
         this.keyIndexHelperFactory = keyIndexHelperFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -60,6 +60,6 @@
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMPrimaryInsertOperatorNodePushable(ctx, partition, indexHelperFactory, keyIndexHelperFactory,
                 fieldPermutation, intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys,
-                filterFields, sourceLoc, tuplePartitionerFactory, partitionsMap);
+                filterFields, sourceLoc, tuplePartitionerFactory, partitionsMap, op);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7fb3369..072a9b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -107,10 +107,10 @@
             int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
-            SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap)
-            throws HyracksDataException {
-        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
-                modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+            SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap,
+            IndexOperation op) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory, null,
+                tuplePartitionerFactory, partitionsMap);
         this.sourceLoc = sourceLoc;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
         this.searchCallbacks = new ISearchOperationCallback[partitions.length];
@@ -160,47 +160,59 @@
                         // already processed; skip
                         return;
                     }
-                    keyTuple.reset(accessor, index);
-                    searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
-                    boolean duplicate = false;
+                    switch (op) {
+                        case INSERT:
+                        case UPSERT:
+                            keyTuple.reset(accessor, index);
+                            searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
+                            boolean duplicate = false;
 
-                    lsmAccessorForUniqunessCheck.search(cursor, searchPred);
-                    try {
-                        if (cursor.hasNext()) {
-                            // duplicate, skip
-                            if (searchCallback instanceof LockThenSearchOperationCallback) {
-                                ((LockThenSearchOperationCallback) searchCallback).release();
+                            lsmAccessorForUniqunessCheck.search(cursor, searchPred);
+                            try {
+                                if (cursor.hasNext()) {
+                                    // duplicate, skip
+                                    if (searchCallback instanceof LockThenSearchOperationCallback) {
+                                        ((LockThenSearchOperationCallback) searchCallback).release();
+                                    }
+                                    duplicate = true;
+                                }
+                            } finally {
+                                cursor.close();
                             }
-                            duplicate = true;
-                        }
-                    } finally {
-                        cursor.close();
-                    }
-                    if (!duplicate) {
-                        beforeModification(tuple);
-                        ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
-                        if (lsmAccessorForKeyIndex != null) {
-                            lsmAccessorForKeyIndex.forceUpsert(keyTuple);
-                        }
-                    } else {
-                        // we should flush previous inserted records so that these transactions can commit
-                        flushPartialFrame();
-                        // feed requires this nested exception to remove duplicated tuples
-                        // TODO: a better way to treat duplicates?
-                        throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
-                                HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                            if (!duplicate) {
+                                beforeModification(tuple);
+                                ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
+                                if (lsmAccessorForKeyIndex != null) {
+                                    lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+                                }
+                            } else {
+                                // we should flush previous inserted records so that these transactions can commit
+                                flushPartialFrame();
+                                // feed requires this nested exception to remove duplicated tuples
+                                // TODO: a better way to treat duplicates?
+                                throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+                                        HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                            }
+                            break;
+                        case DELETE:
+                            ((ILSMIndexAccessor) indexAccessor).forceDelete(tuple);
+                            break;
+                        default:
+                            throw HyracksDataException.create(ErrorCode.INVALID_OPERATOR_OPERATION, sourceLoc,
+                                    op.toString(), LSMPrimaryInsertOperatorNodePushable.class.getSimpleName());
+
                     }
                     processedTuples.add(index);
                 }
 
                 @Override
                 public void start() throws HyracksDataException {
-                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(op);
                 }
 
                 @Override
                 public void finish() throws HyracksDataException {
-                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(op);
                 }
 
                 @Override