Unexpose enter and exit components

Change-Id: Ib4ef7b432bbe6ac9cf2bbe9244cfe2b406f4cb93
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1778
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 40635c8..0a11684 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -59,6 +59,7 @@
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -92,15 +93,16 @@
     private IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private AbstractIndexModificationOperationCallback abstractModCallback;
-    private final boolean hasSecondaries;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
+    private final IFrameTupleProcessor processor;
+    private LSMTreeIndexAccessor lsmAccessor;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
             int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
-            IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
+            IMissingWriterFactory missingWriterFactory, final boolean hasSecondaries) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, null);
         this.key = new PermutingFrameTupleReference();
@@ -125,7 +127,61 @@
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-        this.hasSecondaries = hasSecondaries;
+        processor = new IFrameTupleProcessor() {
+            @Override
+            public void process(ITupleReference tuple, int index) throws HyracksDataException {
+                try {
+                    tb.reset();
+                    boolean recordWasInserted = false;
+                    boolean recordWasDeleted = false;
+                    resetSearchPredicate(index);
+                    if (isFiltered || hasSecondaries) {
+                        lsmAccessor.search(cursor, searchPred);
+                        if (cursor.hasNext()) {
+                            cursor.next();
+                            prevTuple = cursor.getTuple();
+                            cursor.reset(); // end the search
+                            appendFilterToPrevTuple();
+                            appendPrevRecord();
+                            appendPreviousMeta();
+                            appendFilterToOutput();
+                        } else {
+                            appendPreviousTupleAsMissing();
+                        }
+                    } else {
+                        searchCallback.before(key); // lock
+                        appendPreviousTupleAsMissing();
+                    }
+                    if (isDeleteOperation(tuple, numOfPrimaryKeys)) {
+                        // Only delete if it is a delete and not upsert
+                        abstractModCallback.setOp(Operation.DELETE);
+                        lsmAccessor.forceDelete(tuple);
+                        recordWasDeleted = true;
+                    } else {
+                        abstractModCallback.setOp(Operation.UPSERT);
+                        lsmAccessor.forceUpsert(tuple);
+                        recordWasInserted = true;
+                    }
+                    if (isFiltered && prevTuple != null) {
+                        // need to update the filter of the new component with the previous value
+                        lsmAccessor.updateFilter(prevTuple);
+                    }
+                    writeOutput(index, recordWasInserted, recordWasDeleted);
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
+            @Override
+            public void start() throws HyracksDataException {
+                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+            }
+
+            @Override
+            public void finish() throws HyracksDataException {
+                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+            }
+        };
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -163,15 +219,24 @@
             searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory
                     .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
             indexAccessor = index.createAccessor(abstractModCallback, searchCallback);
-
+            lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             INcApplicationContext appCtx =
                     (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     appCtx.getTransactionSubsystem().getLogManager());
-            frameOpCallback =
-                    frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
+            frameOpCallback = new IFrameOperationCallback() {
+                IFrameOperationCallback callback =
+                        frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
+
+                @Override
+                public void frameCompleted() throws HyracksDataException {
+                    callback.frameCompleted();
+                    appender.write(writer, true);
+                }
+            };
+
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
@@ -212,59 +277,7 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
-        int tupleCount = accessor.getTupleCount();
-        int i = 0;
-        lsmAccessor.enter();
-        try {
-            while (i < tupleCount) {
-                tb.reset();
-                boolean recordWasInserted = false;
-                boolean recordWasDeleted = false;
-                tuple.reset(accessor, i);
-                resetSearchPredicate(i);
-                if (isFiltered || hasSecondaries) {
-                    lsmAccessor.search(cursor, searchPred);
-                    if (cursor.hasNext()) {
-                        cursor.next();
-                        prevTuple = cursor.getTuple();
-                        cursor.reset(); // end the search
-                        appendFilterToPrevTuple();
-                        appendPrevRecord();
-                        appendPreviousMeta();
-                        appendFilterToOutput();
-                    } else {
-                        appendPreviousTupleAsMissing();
-                    }
-                } else {
-                    searchCallback.before(key); // lock
-                    appendPreviousTupleAsMissing();
-                }
-                if (isDeleteOperation(tuple, numOfPrimaryKeys)) {
-                    // Only delete if it is a delete and not upsert
-                    abstractModCallback.setOp(Operation.DELETE);
-                    lsmAccessor.forceDelete(tuple);
-                    recordWasDeleted = true;
-                } else {
-                    abstractModCallback.setOp(Operation.UPSERT);
-                    lsmAccessor.forceUpsert(tuple);
-                    recordWasInserted = true;
-                }
-                if (isFiltered && prevTuple != null) {
-                    // need to update the filter of the new component with the previous value
-                    lsmAccessor.updateFilter(prevTuple);
-                }
-                writeOutput(i, recordWasInserted, recordWasDeleted);
-                i++;
-            }
-            // callback here before calling nextFrame on the next operator
-            frameOpCallback.frameCompleted();
-            appender.write(writer, true);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        } finally {
-            lsmAccessor.exit();
-        }
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
     }
 
     private void appendFilterToOutput() throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
new file mode 100644
index 0000000..3fbe6cd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface IFrameTupleProcessor {
+
+    /**
+     * Called once per batch before starting the batch process
+     */
+    void start() throws HyracksDataException;
+
+    /**
+     * process the frame tuple with the frame index
+     *
+     * @param tuple
+     *            the tuple
+     * @param index
+     *            the index of the tuple in the frame
+     * @throws HyracksDataException
+     */
+    void process(ITupleReference tuple, int index) throws HyracksDataException;
+
+    /**
+     * Called once per batch before ending the batch process
+     */
+    void finish() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index f9a4e80..0fe7e04 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -22,6 +22,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -197,26 +199,28 @@
             throws HyracksDataException;
 
     /**
-     * Enter components for the operation
-     *
-     * @param ctx
-     * @throws HyracksDataException
-     */
-    void enter(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
-    /**
-     * Exits components for the operation
-     *
-     * @param ctx
-     * @throws HyracksDataException
-     */
-    void exit(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
-    /**
      * Update the filter with the value in the passed tuple
      *
      * @param ctx
      * @throws HyracksDataException
      */
     void updateFilter(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Perform batch operation on all tuples in the passed frame tuple accessor
+     *
+     * @param ctx
+     *            the operation ctx
+     * @param accessor
+     *            the frame tuple accessor
+     * @param tuple
+     *            the mutable tuple used to pass the tuple to the processor
+     * @param processor
+     *            the tuple processor
+     * @param frameOpCallback
+     *            the callback at the end of the frame
+     * @throws HyracksDataException
+     */
+    void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index f846c80..ad53e73 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -35,16 +35,6 @@
 public interface ILSMIndexAccessor extends IIndexAccessor {
 
     /**
-     * Enter the memory component for modification
-     */
-    void enter() throws HyracksDataException;
-
-    /**
-     * Exit the memory component
-     */
-    void exit() throws HyracksDataException;
-
-    /**
      * Schedule a flush operation
      *
      * @param callback
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e315858..1502706 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -29,9 +29,13 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
@@ -586,16 +590,14 @@
         lsmIndex.updateFilter(ctx, tuple);
     }
 
-    @Override
-    public void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+    private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {
             lsmIndex.allocateMemoryComponents();
         }
         getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
     }
 
-    @Override
-    public void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
+    private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
         getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
     }
 
@@ -608,4 +610,26 @@
             exitAndComplete(ctx, op);
         }
     }
+
+    @Override
+    public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+        processor.start();
+        enter(ctx);
+        try {
+            int tupleCount = accessor.getTupleCount();
+            int i = 0;
+            while (i < tupleCount) {
+                tuple.reset(accessor, i);
+                processor.process(tuple, i);
+                i++;
+            }
+            frameOpCallback.frameCompleted();
+            processor.finish();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        } finally {
+            exit(ctx);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index a41d2c0..20d71e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -23,15 +23,19 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -200,20 +204,13 @@
         return cursorFactory.create(ctx);
     }
 
-    @Override
-    public void enter() throws HyracksDataException {
-        ctx.setOperation(IndexOperation.UPSERT);
-        lsmHarness.enter(ctx);
-    }
-
-    @Override
-    public void exit() throws HyracksDataException {
-        ctx.setOperation(IndexOperation.UPSERT);
-        lsmHarness.exit(ctx);
-    }
-
     public void updateFilter(ITupleReference tuple) throws HyracksDataException {
         ctx.setOperation(IndexOperation.UPSERT);
         lsmHarness.updateFilter(ctx, tuple);
     }
+
+    public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
+            IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 2482103..fb5cc9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -202,23 +202,4 @@
     public void forceUpsert(ITupleReference tuple) throws HyracksDataException {
         throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
     }
-
-    /**
-     * enter the memory component for modification
-     */
-    @Override
-    public void enter() throws HyracksDataException {
-        ctx.setOperation(IndexOperation.UPSERT);
-        lsmHarness.enter(ctx);
-    }
-
-    /**
-     * exit the memory component
-     */
-    @Override
-    public void exit() throws HyracksDataException {
-        ctx.setOperation(IndexOperation.UPSERT);
-        lsmHarness.exit(ctx);
-    }
-
 }