[NO ISSUE][STO] Fix search when switching from memory to disk component

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- When searching the index and making the switch from the memory
  components to the disk components, keep the states of the queue and
  the cursors on the switched-to disk components the same as their
  states were on the memory components. If a cursor was the one who
  produced the outputElement, then do not push the next element into
  the queue from the cursor since there should not be an element in
  the queue from this cursor. Restart the search operation at the
  elements that the cursors were at and consume them since they were
  already consumed before we make the switch.

- add test case.

Change-Id: I647641f6044c1edf1477049be1c5d1b697f404c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14885
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f40628fb..0b80881 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
-        int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+        int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
         int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
-            pkFieldsInCommitOp[i] = diff + i;
+            pkFieldsInCommitOp[i] = start++;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
                 true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@
 
     private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
             ARecordType itemType, ARecordType metaItemType) throws Exception {
-        ITypeTraits[] outputTypeTraits =
-                new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
-                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        // 1 boolean field at the beginning to indicate whether the operation was upsert or delete
+        int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
+        ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
+        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];
 
-        // add the previous record first
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
         int f = 0;
-        outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        // add the upsert indicator boolean field
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
+        f++;
+        // add the previous record
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
         f++;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
             f++;
         }
         // add the previous filter third
@@ -854,10 +864,8 @@
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                    .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 24379a3..c5292d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,6 +44,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -84,7 +94,7 @@
     private static final boolean[] UNIQUE_META_FIELDS = null;
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
     private static final int TOTAL_NUM_OF_RECORDS = 2000;
     private static final int RECORDS_PER_COMPONENT = 1000;
     private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMPrimaryInsertOperatorNodePushable insertOp;
+    private static LSMPrimaryUpsertOperatorNodePushable upsertOp;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -143,6 +154,8 @@
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
+        upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
     }
 
     @After
@@ -202,6 +215,63 @@
     }
 
     @Test
+    public void testCursorSwitchSucceedWithNoDuplicates() {
+        try {
+            StorageTestUtils.allowAllOps(lsmBtree);
+            lsmBtree.clearSearchCallbacks();
+            RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
+            ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
+            for (int j = 0; j < totalNumRecords; j++) {
+                ITupleReference tuple = tupleGenerator.next();
+                upsertTuples[j] = TupleUtils.copyTuple(tuple);
+            }
+
+            // upsert and flush the tuples to create a disk component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, true);
+            // upsert but don't flush the tuples to create a memory component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, false);
+
+            // do the search operation
+            ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
+                    lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
+            IIndexCursor searchCursor = accessor.createSearchCursor(false);
+            MultiComparator lowKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            MultiComparator highKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            RangePredicate rangePredicate =
+                    new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);
+
+            accessor.search(searchCursor, rangePredicate);
+
+            int count = 0;
+            while (searchCursor.hasNext()) {
+                searchCursor.next();
+                count++;
+                // flush the memory component to disk so that we make the switch to it when we hit the switch cycle
+                if (count == 1) {
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                }
+            }
+
+            Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
+            failure = CleanupUtils.destroy(failure, searchCursor, accessor);
+            Assert.assertEquals("Records count not matching", totalNumRecords, count);
+            if (failure != null) {
+                Assert.fail(failure.getMessage());
+            }
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
     public void testCursorSwitchFails() {
         try {
             // allow all operations
@@ -268,4 +338,17 @@
         emptyTupleOp.close();
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
+
+    private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
+            boolean flush) throws Exception {
+        upsertOp.open();
+        for (int j = 0; j < totalNumRecords; j++) {
+            DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
+        }
+        tupleAppender.write(upsertOp, true);
+        if (flush) {
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+        }
+        upsertOp.close();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 46d279f..2c5fb50 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -236,10 +236,15 @@
                     rangeCursors[i].close();
                     btreeAccessors[i].reset(btree, iap);
                     btreeAccessors[i].search(rangeCursors[i], reusablePred);
-                    pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                    // consume the element that we restarted the search at since before the switch it was consumed
+                    if (rangeCursors[i].hasNext()) {
+                        rangeCursors[i].next();
+                        switchedElements[i].reset(rangeCursors[i].getTuple());
+                    }
                 }
             }
             switchRequest[i] = false;
+            switchedElements[i] = null;
             // any failed switch makes further switches pointless
             switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
         }
@@ -264,14 +269,18 @@
                 if (replaceFrom < 0) {
                     replaceFrom = i;
                 }
-                // we return the outputElement to the priority queue if it came from this component
+
+                PriorityQueueElement element;
                 if (outputElement != null && outputElement.getCursorIndex() == i) {
-                    pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
-                    needPushElementIntoQueue = false;
-                    outputElement = null;
-                    canCallProceed = true;
+                    // there should be no element from this cursor in the queue since the element was polled
+                    if (findElement(outputPriorityQueue, i) != null) {
+                        throw new IllegalStateException("found element in the queue from the cursor of output element");
+                    }
+                    element = outputElement;
+                } else {
+                    element = findElement(outputPriorityQueue, i);
                 }
-                PriorityQueueElement element = remove(outputPriorityQueue, i);
+
                 // if this cursor is still active (has an element)
                 // then we copy the search key to restart the operation after
                 // replacing the component
@@ -331,6 +340,18 @@
         return null;
     }
 
+    private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
+        // Scans the PQ for the component's element
+        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+        while (it.hasNext()) {
+            PriorityQueueElement e = it.next();
+            if (e.getCursorIndex() == cursorIndex) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index 70f6d9e..5b0363a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -193,7 +193,7 @@
             for (int i = 0; i < count; i++) {
                 ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
                 if (removed.getType() == LSMComponentType.MEMORY) {
-                    LOGGER.info("Removed a memory component from the search operation");
+                    LOGGER.debug("Removed memory component {} from the search operation", removed);
                 } else {
                     throw new IllegalStateException("Disk components can't be removed from the search operation");
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index b6f6e26..27875c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
-    protected static final int SWITCH_COMPONENT_CYCLE = 100;
+    public static final int SWITCH_COMPONENT_CYCLE = 100;
     protected final ILSMIndexOperationContext opCtx;
     protected final boolean returnDeletedTuples;
     protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@
         needPushElementIntoQueue = false;
         for (int i = 0; i < switchRequest.length; i++) {
             switchRequest[i] = false;
+            switchedElements[i] = null;
         }
         try {
             if (outputPriorityQueue != null) {