ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory

Revisiting the previous fix by calling fail() on pipeline

Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101
Reviewed-on: https://asterix-gerrit.ics.uci.edu/443
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 6618326..1a7150e 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -20,9 +20,6 @@
 
 import java.nio.ByteBuffer;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -36,6 +33,8 @@
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
@@ -93,10 +92,11 @@
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
+            @Override
             public void initialize() throws HyracksDataException {
                 IFrameWriter startOfPipeline;
-                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                        : null;
+                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+                        ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
 
                 PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
                         pipelineOutputRecordDescriptor);
@@ -105,8 +105,13 @@
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
-                startOfPipeline.open();
-                startOfPipeline.close();
+                try {
+                    startOfPipeline.open();
+                } catch (HyracksDataException e) {
+                    startOfPipeline.fail();
+                } finally {
+                    startOfPipeline.close();
+                }
             }
         };
     }
@@ -120,10 +125,10 @@
             @Override
             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
-                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                            : null;
-                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
-                            AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0
+                            ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
+                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+                            .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
                     PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     try {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 5b66736..a2b9652 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -43,20 +43,21 @@
     public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException {
         return new AbstractOneInputSourcePushRuntime() {
 
-            private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
-            private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
+            private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+            private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
 
             @Override
             public void open() throws HyracksDataException {
                 writer.open();
-                try {
-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new IllegalStateException();
-                    }
-                    appender.flush(writer, true);
-                } finally {
-                    writer.close();
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
                 }
+                appender.flush(writer, true);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
             }
         };
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 7f6dd10..2ca1e0f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -81,7 +81,9 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         ensureConnected();
-        delegate.nextFrame(buffer);
+        if (!failed) {
+            delegate.nextFrame(buffer);
+        }
     }
 
     private void ensureConnected() throws HyracksDataException {