[NO ISSUE][STO] Report batch operation failure before exiting
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- In certain cases, caller of a batch operation call are
interested in failure events.
- In those cases, we used to report failure after exiting
the components but with this change, failure reporting
happens before the exit.
Change-Id: I0c22b6bddfe8f12ef8e3c59dae0b0c585137a126
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2956
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b1a1fcc..453ffa0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -135,9 +134,4 @@
public final IFrameWriter getInputFrameWriter(int index) {
return null;
}
-
- @Override
- public JobId getJobId() {
- return ctx.getJobletContext().getJobId();
- }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 2da7193..a52f01e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -21,7 +21,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
public interface IActiveRuntime {
@@ -44,11 +43,6 @@
void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException;
/**
- * @return the job id associated with this active runtime
- */
- JobId getJobId();
-
- /**
* @return the runtime stats for monitoring purposes
*/
default String getStats() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074fe..b855981 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
public void finish() throws HyracksDataException {
lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
}
+
+ @Override
+ public void fail(Throwable th) {
+ // We must fail before we exit the components
+ frameOpCallback.fail(th);
+ }
};
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- try {
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
- } catch (Throwable th) {// NOSONAR: Must notify of all failures
- frameOpCallback.fail(th);
- throw th;
- }
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index 3fbe6cd..b6192c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -43,4 +43,11 @@
* Called once per batch before ending the batch process
*/
void finish() throws HyracksDataException;
+
+ /**
+ * Called when a failure is encountered processing a frame
+ *
+ * @param th
+ */
+ void fail(Throwable th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aa7be86..e9f6f20 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -700,6 +700,9 @@
try {
processFrame(accessor, tuple, processor);
frameOpCallback.frameCompleted();
+ } catch (Throwable th) {
+ processor.fail(th);
+ throw th;
} finally {
processor.finish();
}