Merge branch 'gerrit/neo' into 'gerrit/trinity'

Ext-ref: MB-64229
Change-Id: Ic3fd8f7abd64252ad2473b049a0d222747a43857
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
         }
 
         @Override
+        public void beforeExit(boolean success) throws HyracksDataException {
+            // No Op
+        }
+
+        @Override
         public void close() throws IOException {
             // No Op
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index eadb614..5c87994 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -33,6 +35,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.HyracksThrowingAction;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -50,6 +53,7 @@
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
@@ -77,6 +81,7 @@
     private boolean flushedPartialTuples;
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
+    private IBatchController batchController;
 
     private final PermutingFrameTupleReference keyTuple;
 
@@ -116,6 +121,8 @@
 
     protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
         return new IFrameTupleProcessor() {
+            private HyracksThrowingAction exitAction;
+
             @Override
             public void process(ITupleReference tuple, int index) throws HyracksDataException {
                 if (index < currentTupleIdx) {
@@ -219,6 +226,7 @@
                     (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     appCtx.getTransactionSubsystem().getLogManager());
+            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -227,7 +235,7 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
 
         writeBuffer.ensureFrameSize(buffer.capacity());
         if (flushedPartialTuples) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3b8ee68..d2dc3cf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -62,6 +64,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -117,6 +120,7 @@
     private final ITracer tracer;
     private final long traceCategory;
     private long lastRecordInTimeStamp = 0L;
+    private IBatchController batchController;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -289,6 +293,11 @@
                 }
 
                 @Override
+                public void beforeExit(boolean success) throws HyracksDataException {
+                    callback.beforeExit(success);
+                }
+
+                @Override
                 public void close() throws IOException {
                     callback.close();
                 }
@@ -304,6 +313,7 @@
                 }
             };
             frameOpCallback.open();
+            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -344,7 +354,7 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int itemCount = accessor.getTupleCount();
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
@@ -484,4 +494,5 @@
     public void flush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+    static final IBatchController INSTANCE = new StandardBatchController();
+
+    private StandardBatchController() {
+    }
+
+    @Override
+    public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException {
+        lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+    }
+
+    @Override
+    public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException {
+        lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
      * @param writer the FrameWriter to write to and flush
      * @throws HyracksDataException
      */
-    public default void flush(IFrameWriter writer) throws HyracksDataException {
+    default void flush(IFrameWriter writer) throws HyracksDataException {
         write(writer, true);
         writer.flush();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+    void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index c50c6b5..abe62c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -287,6 +287,42 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+    public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (HyracksThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     /**
      * Runs the supplied action, after suspending any pending interruption. An error will be logged if
      * the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
-
-    /*
-     * Always write and then flush to send out the message if exists
-     */
-    @Override
-    public void flush(IFrameWriter writer) throws HyracksDataException {
-        write(writer, true);
-        writer.flush();
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
         Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
         return sharedMap == null ? null : (T) sharedMap.get(key);
     }
+
+    /**
+     * get a <T> object from the shared map of the task, or returns the default value
+     *
+     * @param key
+     * @param ctx
+     * @param defaultValue
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+        Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+        return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+    String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+    void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException;
+
+    void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
 public interface IFrameOperationCallback extends Closeable {
     /**
      * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
-     * the pipeline
+     * the pipeline. In the event this frame completion will also exit the component, this will be
+     * called prior to {@link #beforeExit(boolean)}.
      *
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
 
     /**
-     * Called when the task has failed.
+     * Called just prior to exiting the component on batch completion: not all batches may result
+     * in a component exit, depending on the decision of the {@link IBatchController}.
+     *
+     * @throws HyracksDataException
+     */
+    void beforeExit(boolean success) throws HyracksDataException;
+
+    /**
+     * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+     * invocation has failed.
      *
      * @param th
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..68de45a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -39,7 +39,6 @@
      * @param tuple
      *            the operation tuple
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
 
@@ -54,7 +53,6 @@
      *            the operation tuple
      * @return
      * @throws HyracksDataException
-     * @throws IndexException
      */
     boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException;
@@ -69,7 +67,6 @@
      * @param pred
      *            the search predicate
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
 
@@ -104,9 +101,7 @@
      * Schedule a merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -114,9 +109,7 @@
      * Schedule full merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -125,7 +118,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void merge(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -133,7 +125,6 @@
      * Schedule a flush
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
      */
     ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +134,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void flush(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -153,7 +143,6 @@
      * @param ioOperation
      *            the io operation that added the new component
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
 
@@ -225,20 +214,22 @@
     /**
      * Perform batch operation on all tuples in the passed frame tuple accessor
      *
-     * @param ctx
-     *            the operation ctx
-     * @param accessor
-     *            the frame tuple accessor
-     * @param tuple
-     *            the mutable tuple used to pass the tuple to the processor
-     * @param processor
-     *            the tuple processor
-     * @param frameOpCallback
-     *            the callback at the end of the frame
+     * @param ctx             the operation ctx
+     * @param accessor        the frame tuple accessor
+     * @param tuple           the mutable tuple used to pass the tuple to the processor
+     * @param processor       the tuple processor
+     * @param frameOpCallback the callback at the end of the frame
+     * @param batchController
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+            throws HyracksDataException;
+
+    void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+    void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
+            throws HyracksDataException;
 
     /**
      * Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 717bcce..8ad67f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -688,15 +689,27 @@
         lsmIndex.updateFilter(ctx, tuple);
     }
 
-    private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+    @Override
+    public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {
             lsmIndex.allocateMemoryComponents();
         }
-        getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+        getAndEnterComponents(ctx, op, false);
     }
 
-    private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
-        getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+    @Override
+    public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+            LSMOperationType op) throws HyracksDataException {
+        try {
+            callback.beforeExit(success);
+        } catch (Throwable th) {
+            // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+            //              failures in the callback API- we might need this eventually
+            callback.fail(th);
+            throw th;
+        } finally {
+            getAndExitComponentsAndComplete(ctx, op);
+        }
     }
 
     private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -711,12 +724,15 @@
 
     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+            throws HyracksDataException {
         processor.start();
-        enter(ctx);
+        batchController.batchEnter(ctx, this, frameOpCallback);
+        boolean success = false;
         try {
             try {
                 processFrame(accessor, tuple, processor);
+                success = true;
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -728,7 +744,7 @@
             LOGGER.warn("Failed to process frame", e);
             throw e;
         } finally {
-            exit(ctx);
+            batchController.batchExit(ctx, this, frameOpCallback, success);
             ctx.logPerformanceCounters(accessor.getTupleCount());
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..e688727 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -210,8 +211,8 @@
     }
 
     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
-            IFrameOperationCallback frameOpCallback) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+            IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController);
     }
 
     @Override