Fix Upsert to Never Enforce the First Operation

Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1336
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 96f9e76..536366f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -24,12 +24,14 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -80,6 +82,7 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
+    private LockThenSearchOperationCallback searchCallback;
 
     public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -140,8 +143,9 @@
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
                     index, ctx, this);
-            indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
-                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
+            searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
+                    .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this);
+            indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
@@ -167,6 +171,12 @@
         }
         if (recordWasInserted || recordWasDeleted) {
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        } else {
+            try {
+                searchCallback.release();
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
@@ -185,6 +195,7 @@
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
+        boolean firstModification = true;
         int i = 0;
         try {
             while (i < tupleCount) {
@@ -217,8 +228,9 @@
                         tb.addFieldEndOffset();
                     }
                     modCallback.setOp(Operation.DELETE);
-                    if (i == 0) {
+                    if (firstModification) {
                         lsmAccessor.delete(prevTuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceDelete(prevTuple);
                     }
@@ -236,8 +248,9 @@
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
                     modCallback.setOp(Operation.INSERT);
-                    if ((prevTuple == null) && (i == 0)) {
+                    if (firstModification) {
                         lsmAccessor.insert(tuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceInsert(tuple);
                     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index ef3b218..288e7a5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -42,6 +42,7 @@
     private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
     private final ILogManager logManager;
     private final ILogRecord logRecord;
+    private int pkHash;
 
     public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem,
             ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
@@ -75,7 +76,7 @@
 
     @Override
     public void before(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
             if (operatorNodePushable != null) {
 
@@ -122,4 +123,8 @@
     private void logWait() throws ACIDException {
         logManager.log(logRecord);
     }
+
+    public void release() throws ACIDException {
+        lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx);
+    }
 }