Merge "Merge branch 'gerrit/trinity' into 'gerrit/goldfish'" into goldfish
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
         }
 
         @Override
+        public void beforeExit(boolean success) throws HyracksDataException {
+            // No Op
+        }
+
+        @Override
         public void close() throws IOException {
             // No Op
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 072a9b6..9b8b103 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -58,6 +60,7 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -95,12 +98,13 @@
     private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
 
     private final IFrameOperationCallback[] frameOpCallbacks;
-    private boolean flushedPartialTuples;
     private final PermutingFrameTupleReference keyTuple;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
     private final IntSet processedTuples = new IntOpenHashSet();
     private final IntSet flushedTuples = new IntOpenHashSet();
     private final SourceLocation sourceLoc;
+    private boolean flushedPartialTuples;
+    private IBatchController batchController;
 
     public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
@@ -284,6 +288,7 @@
             searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
             frameTuple = new FrameTupleReference();
+            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -305,7 +310,8 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
 
         writeBuffer.ensureFrameSize(buffer.capacity());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8bc2b1c..dd9f4070d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -71,6 +73,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -140,11 +143,12 @@
     private final ITracer tracer;
     private final long traceCategory;
     private final ITupleProjector tupleProjector;
-    private long lastRecordInTimeStamp = 0L;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
     private final boolean hasSecondaries;
     private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
     private final int[] fieldPermutation;
+    private long lastRecordInTimeStamp = 0L;
+    private IBatchController batchController;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -362,6 +366,11 @@
                 }
 
                 @Override
+                public void beforeExit(boolean success) throws HyracksDataException {
+                    callback.beforeExit(success);
+                }
+
+                @Override
                 public void close() throws IOException {
                     callback.close();
                 }
@@ -378,6 +387,7 @@
             };
             frameOpCallbacks[i].open();
         }
+        batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
     }
 
     protected void resetSearchPredicate(int tupleIndex) {
@@ -437,7 +447,8 @@
             LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
             IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
             IFrameTupleProcessor processor = processors[pIdx];
-            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+                    p2tuplesMapEntry.getValue());
         }
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
@@ -624,4 +635,5 @@
             }
         }
     }
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+    static final IBatchController INSTANCE = new StandardBatchController();
+
+    private StandardBatchController() {
+    }
+
+    @Override
+    public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException {
+        lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+    }
+
+    @Override
+    public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException {
+        lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
      * @param writer the FrameWriter to write to and flush
      * @throws HyracksDataException
      */
-    public default void flush(IFrameWriter writer) throws HyracksDataException {
+    default void flush(IFrameWriter writer) throws HyracksDataException {
         write(writer, true);
         writer.flush();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+    void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 3ed17b1..ad5e919 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -288,6 +288,42 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+    public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (HyracksThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) {
         Throwable savedT = null;
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
-
-    /*
-     * Always write and then flush to send out the message if exists
-     */
-    @Override
-    public void flush(IFrameWriter writer) throws HyracksDataException {
-        write(writer, true);
-        writer.flush();
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
         Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
         return sharedMap == null ? null : (T) sharedMap.get(key);
     }
+
+    /**
+     * get a <T> object from the shared map of the task, or returns the default value
+     *
+     * @param key
+     * @param ctx
+     * @param defaultValue
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+        Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+        return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+    String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+    void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException;
+
+    void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
 public interface IFrameOperationCallback extends Closeable {
     /**
      * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
-     * the pipeline
+     * the pipeline. In the event this frame completion will also exit the component, this will be
+     * called prior to {@link #beforeExit(boolean)}.
      *
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
 
     /**
-     * Called when the task has failed.
+     * Called just prior to exiting the component on batch completion: not all batches may result
+     * in a component exit, depending on the decision of the {@link IBatchController}.
+     *
+     * @throws HyracksDataException
+     */
+    void beforeExit(boolean success) throws HyracksDataException;
+
+    /**
+     * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+     * invocation has failed.
      *
      * @param th
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 214d9dc..dbf34c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -227,10 +227,17 @@
      *            the callback at the end of the frame
      * @param tuples
      *            the indexes of tuples to process
+     * @param batchController
+     *            the controller of the batch lifecycle
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException;
+
+    void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+    void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
             throws HyracksDataException;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2d04020..d019a08 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -692,15 +693,27 @@
         lsmIndex.updateFilter(ctx, tuple);
     }
 
-    private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+    @Override
+    public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {
             lsmIndex.allocateMemoryComponents();
         }
-        getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+        getAndEnterComponents(ctx, op, false);
     }
 
-    private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
-        getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+    @Override
+    public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+            LSMOperationType op) throws HyracksDataException {
+        try {
+            callback.beforeExit(success);
+        } catch (Throwable th) {
+            // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+            //              failures in the callback API- we might need this eventually
+            callback.fail(th);
+            throw th;
+        } finally {
+            getAndExitComponentsAndComplete(ctx, op);
+        }
     }
 
     private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -715,13 +728,15 @@
 
     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
-            throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+            Set<Integer> tuples) throws HyracksDataException {
         processor.start();
-        enter(ctx);
+        batchController.batchEnter(ctx, this, frameOpCallback);
+        boolean success = false;
         try {
             try {
                 processFrame(accessor, tuple, processor, tuples);
+                success = true;
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -733,7 +748,7 @@
             LOGGER.warn("Failed to process frame", e);
             throw e;
         } finally {
-            exit(ctx);
+            batchController.batchExit(ctx, this, frameOpCallback, success);
             ctx.logPerformanceCounters(accessor.getTupleCount());
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fb5984d..c768768 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -211,8 +212,9 @@
     }
 
     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
-            IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
+            IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples)
+            throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples);
     }
 
     @Override