[NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback

Ext-ref: MB-64229
Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
         }
 
         @Override
+        public void beforeExit(boolean success) throws HyracksDataException {
+            // No Op
+        }
+
+        @Override
         public void close() throws IOException {
             // No Op
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 643c6ab..d2dc3cf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -293,6 +293,11 @@
                 }
 
                 @Override
+                public void beforeExit(boolean success) throws HyracksDataException {
+                    callback.beforeExit(success);
+                }
+
+                @Override
                 public void close() throws IOException {
                     callback.close();
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
index f9f758b..40465c6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -20,6 +20,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -31,12 +32,14 @@
     }
 
     @Override
-    public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException {
+    public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException {
         lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
     }
 
     @Override
-    public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException {
-        lsmHarness.exit(ctx, LSMOperationType.MODIFICATION);
+    public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException {
+        lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
index 879a8d2..e061589 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -23,7 +23,9 @@
 public interface IBatchController {
     String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
 
-    void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException;
+    void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException;
 
-    void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException;
+    void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
 public interface IFrameOperationCallback extends Closeable {
     /**
      * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
-     * the pipeline
+     * the pipeline. In the event this frame completion will also exit the component, this will be
+     * called prior to {@link #beforeExit(boolean)}.
      *
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
 
     /**
-     * Called when the task has failed.
+     * Called just prior to exiting the component on batch completion: not all batches may result
+     * in a component exit, depending on the decision of the {@link IBatchController}.
+     *
+     * @throws HyracksDataException
+     */
+    void beforeExit(boolean success) throws HyracksDataException;
+
+    /**
+     * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+     * invocation has failed.
      *
      * @param th
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 721d809..68de45a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -228,7 +228,8 @@
 
     void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
 
-    void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException;
+    void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
+            throws HyracksDataException;
 
     /**
      * Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6e20686..017e767 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -698,8 +698,18 @@
     }
 
     @Override
-    public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
-        getAndExitComponentsAndComplete(ctx, op);
+    public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+            LSMOperationType op) throws HyracksDataException {
+        try {
+            callback.beforeExit(success);
+        } catch (Throwable th) {
+            // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+            //              failures in the callback API- we might need this eventually
+            callback.fail(th);
+            throw th;
+        } finally {
+            getAndExitComponentsAndComplete(ctx, op);
+        }
     }
 
     private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -717,10 +727,12 @@
             IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
             throws HyracksDataException {
         processor.start();
-        batchController.batchEnter(this, ctx);
+        batchController.batchEnter(ctx, this, frameOpCallback);
+        boolean success = false;
         try {
             try {
                 processFrame(accessor, tuple, processor);
+                success = true;
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -732,7 +744,7 @@
             LOGGER.warn("Failed to process frame", e);
             throw e;
         } finally {
-            batchController.batchExit(this, ctx);
+            batchController.batchExit(ctx, this, frameOpCallback, success);
             ctx.logPerformanceCounters(accessor.getTupleCount());
         }
     }