ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory
Revisiting the previous fix by calling fail() on pipeline
Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101
Reviewed-on: https://asterix-gerrit.ics.uci.edu/443
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 6618326..1a7150e 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -20,9 +20,6 @@
import java.nio.ByteBuffer;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -36,6 +33,8 @@
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.json.JSONException;
+import org.json.JSONObject;
public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
@@ -93,10 +92,11 @@
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
public void initialize() throws HyracksDataException {
IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+ ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
pipelineOutputRecordDescriptor);
@@ -105,8 +105,13 @@
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
- startOfPipeline.open();
- startOfPipeline.close();
+ try {
+ startOfPipeline.open();
+ } catch (HyracksDataException e) {
+ startOfPipeline.fail();
+ } finally {
+ startOfPipeline.close();
+ }
}
};
}
@@ -120,10 +125,10 @@
@Override
public void open() throws HyracksDataException {
if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+ ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+ .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
try {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 5b66736..a2b9652 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -43,20 +43,21 @@
public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
return new AbstractOneInputSourcePushRuntime() {
- private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
- private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+ private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
@Override
public void open() throws HyracksDataException {
writer.open();
- try {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- appender.flush(writer, true);
- } finally {
- writer.close();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
}
+ appender.flush(writer, true);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
}
};
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 7f6dd10..2ca1e0f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -81,7 +81,9 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ensureConnected();
- delegate.nextFrame(buffer);
+ if (!failed) {
+ delegate.nextFrame(buffer);
+ }
}
private void ensureConnected() throws HyracksDataException {