[NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback
Ext-ref: MB-64229
Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
public void close() throws IOException {
// No Op
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 643c6ab..d2dc3cf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -293,6 +293,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ callback.beforeExit(success);
+ }
+
+ @Override
public void close() throws IOException {
callback.close();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
index f9f758b..40465c6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -31,12 +32,14 @@
}
@Override
- public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException {
+ public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException {
lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
}
@Override
- public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException {
- lsmHarness.exit(ctx, LSMOperationType.MODIFICATION);
+ public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException {
+ lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
index 879a8d2..e061589 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -23,7 +23,9 @@
public interface IBatchController {
String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
- void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException;
+ void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException;
- void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException;
+ void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
public interface IFrameOperationCallback extends Closeable {
/**
* Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
- * the pipeline
+ * the pipeline. In the event this frame completion will also exit the component, this will be
+ * called prior to {@link #beforeExit(boolean)}.
*
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
/**
- * Called when the task has failed.
+ * Called just prior to exiting the component on batch completion: not all batches may result
+ * in a component exit, depending on the decision of the {@link IBatchController}.
+ *
+ * @throws HyracksDataException
+ */
+ void beforeExit(boolean success) throws HyracksDataException;
+
+ /**
+ * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+ * invocation has failed.
*
* @param th
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 721d809..68de45a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -228,7 +228,8 @@
void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
- void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException;
+ void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
+ throws HyracksDataException;
/**
* Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6e20686..017e767 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -698,8 +698,18 @@
}
@Override
- public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
- getAndExitComponentsAndComplete(ctx, op);
+ public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+ LSMOperationType op) throws HyracksDataException {
+ try {
+ callback.beforeExit(success);
+ } catch (Throwable th) {
+ // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+ // failures in the callback API- we might need this eventually
+ callback.fail(th);
+ throw th;
+ } finally {
+ getAndExitComponentsAndComplete(ctx, op);
+ }
}
private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -717,10 +727,12 @@
IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
throws HyracksDataException {
processor.start();
- batchController.batchEnter(this, ctx);
+ batchController.batchEnter(ctx, this, frameOpCallback);
+ boolean success = false;
try {
try {
processFrame(accessor, tuple, processor);
+ success = true;
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -732,7 +744,7 @@
LOGGER.warn("Failed to process frame", e);
throw e;
} finally {
- batchController.batchExit(this, ctx);
+ batchController.batchExit(ctx, this, frameOpCallback, success);
ctx.logPerformanceCounters(accessor.getTupleCount());
}
}