Merge branch 'gerrit/neo'

Change-Id: I493e78c38f1a41557fdb4b960b58dbdc20b62104
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f40628fb..0b80881 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
-        int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+        int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
         int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
-            pkFieldsInCommitOp[i] = diff + i;
+            pkFieldsInCommitOp[i] = start++;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
                 true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@
 
     private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
             ARecordType itemType, ARecordType metaItemType) throws Exception {
-        ITypeTraits[] outputTypeTraits =
-                new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
-                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        // 1 boolean field at the beginning to indicate whether the operation was upsert or delete
+        int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
+        ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
+        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];
 
-        // add the previous record first
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
         int f = 0;
-        outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        // add the upsert indicator boolean field
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
+        f++;
+        // add the previous record
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
         f++;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
             f++;
         }
         // add the previous filter third
@@ -854,10 +864,8 @@
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                    .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 24379a3..c5292d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,6 +44,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -84,7 +94,7 @@
     private static final boolean[] UNIQUE_META_FIELDS = null;
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
     private static final int TOTAL_NUM_OF_RECORDS = 2000;
     private static final int RECORDS_PER_COMPONENT = 1000;
     private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMPrimaryInsertOperatorNodePushable insertOp;
+    private static LSMPrimaryUpsertOperatorNodePushable upsertOp;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -143,6 +154,8 @@
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
+        upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
     }
 
     @After
@@ -202,6 +215,63 @@
     }
 
     @Test
+    public void testCursorSwitchSucceedWithNoDuplicates() {
+        try {
+            StorageTestUtils.allowAllOps(lsmBtree);
+            lsmBtree.clearSearchCallbacks();
+            RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
+            ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
+            for (int j = 0; j < totalNumRecords; j++) {
+                ITupleReference tuple = tupleGenerator.next();
+                upsertTuples[j] = TupleUtils.copyTuple(tuple);
+            }
+
+            // upsert and flush the tuples to create a disk component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, true);
+            // upsert but don't flush the tuples to create a memory component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, false);
+
+            // do the search operation
+            ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
+                    lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
+            IIndexCursor searchCursor = accessor.createSearchCursor(false);
+            MultiComparator lowKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            MultiComparator highKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            RangePredicate rangePredicate =
+                    new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);
+
+            accessor.search(searchCursor, rangePredicate);
+
+            int count = 0;
+            while (searchCursor.hasNext()) {
+                searchCursor.next();
+                count++;
+                // flush the memory component to disk so that we make the switch to it when we hit the switch cycle
+                if (count == 1) {
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                }
+            }
+
+            Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
+            failure = CleanupUtils.destroy(failure, searchCursor, accessor);
+            Assert.assertEquals("Records count not matching", totalNumRecords, count);
+            if (failure != null) {
+                Assert.fail(failure.getMessage());
+            }
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
     public void testCursorSwitchFails() {
         try {
             // allow all operations
@@ -268,4 +338,17 @@
         emptyTupleOp.close();
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
+
+    private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
+            boolean flush) throws Exception {
+        upsertOp.open();
+        for (int j = 0; j < totalNumRecords; j++) {
+            DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
+        }
+        tupleAppender.write(upsertOp, true);
+        if (flush) {
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+        }
+        upsertOp.close();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 514682b..968416c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -246,10 +246,15 @@
                     rangeCursors[i].close();
                     btreeAccessors[i].reset(btree, iap);
                     btreeAccessors[i].search(rangeCursors[i], reusablePred);
-                    pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                    // consume the element that we restarted the search at since before the switch it was consumed
+                    if (rangeCursors[i].hasNext()) {
+                        rangeCursors[i].next();
+                        switchedElements[i].reset(rangeCursors[i].getTuple());
+                    }
                 }
             }
             switchRequest[i] = false;
+            switchedElements[i] = null;
             // any failed switch makes further switches pointless
             switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
         }
@@ -274,14 +279,18 @@
                 if (replaceFrom < 0) {
                     replaceFrom = i;
                 }
-                // we return the outputElement to the priority queue if it came from this component
+
+                PriorityQueueElement element;
                 if (outputElement != null && outputElement.getCursorIndex() == i) {
-                    pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
-                    needPushElementIntoQueue = false;
-                    outputElement = null;
-                    canCallProceed = true;
+                    // there should be no element from this cursor in the queue since the element was polled
+                    if (findElement(outputPriorityQueue, i) != null) {
+                        throw new IllegalStateException("found element in the queue from the cursor of output element");
+                    }
+                    element = outputElement;
+                } else {
+                    element = findElement(outputPriorityQueue, i);
                 }
-                PriorityQueueElement element = remove(outputPriorityQueue, i);
+
                 // if this cursor is still active (has an element)
                 // then we copy the search key to restart the operation after
                 // replacing the component
@@ -341,6 +350,18 @@
         return null;
     }
 
+    private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
+        // Scans the PQ for the component's element
+        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+        while (it.hasNext()) {
+            PriorityQueueElement e = it.next();
+            if (e.getCursorIndex() == cursorIndex) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index 70f6d9e..5b0363a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -193,7 +193,7 @@
             for (int i = 0; i < count; i++) {
                 ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
                 if (removed.getType() == LSMComponentType.MEMORY) {
-                    LOGGER.info("Removed a memory component from the search operation");
+                    LOGGER.debug("Removed memory component {} from the search operation", removed);
                 } else {
                     throw new IllegalStateException("Disk components can't be removed from the search operation");
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index b6f6e26..27875c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
-    protected static final int SWITCH_COMPONENT_CYCLE = 100;
+    public static final int SWITCH_COMPONENT_CYCLE = 100;
     protected final ILSMIndexOperationContext opCtx;
     protected final boolean returnDeletedTuples;
     protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@
         needPushElementIntoQueue = false;
         for (int i = 0; i < switchRequest.length; i++) {
             switchRequest[i] = false;
+            switchedElements[i] = null;
         }
         try {
             if (outputPriorityQueue != null) {