ASTERIXDB-1145: Fix error propagating in operators/connectors:
1. When an AbstractUnarySourceOperator instance runs into an exception, it should call writer.fail() first and then throw the exception.
2. An IFrameWriter.fail() implementation should not throw yet-another exception, instead, it should just propgate
   the failure to its downstream operators and optionally set a "failed" state so that in the close()/nextFrame() method
   it can potentially behave differently from usual close()/nextFrame().

Change-Id: Ifb538155423687c4aa01a0485adeaab87f291547
Reviewed-on: https://asterix-gerrit.ics.uci.edu/491
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b64071c..8322cdc 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -108,6 +108,9 @@
                 try {
                     startOfPipeline.open();
                 } catch (HyracksDataException e) {
+                    // Tell the downstream the job fails.
+                    startOfPipeline.fail();
+                    // Throws the exception.
                     throw e;
                 } finally {
                     startOfPipeline.close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index e20c709..cc517e3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -138,7 +138,9 @@
                     } else if (eos.get()) {
                         break;
                     } else if (failed.get()) {
-                        throw new HyracksDataException("Failure occurred on input");
+                        // Sends failure notification to its downstream.
+                        // It's not supposed to throw exception here because it is on the failure notification channel.
+                        mpw.fail();
                     } else {
                         try {
                             synchronized (this) {
@@ -153,8 +155,8 @@
                 }
                 mpw.close();
                 channel.close();
-                delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
-                        new MaterializedPartitionInputChannel(1, pid, manager))));
+                delegate.addPartitions(Collections
+                        .singleton(new PartitionChannel(pid, new MaterializedPartitionInputChannel(1, pid, manager))));
             } catch (HyracksException e) {
             }
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 5f63546..11b7c31 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -57,7 +57,9 @@
             }
         }
         if (failed) {
-            throw new HyracksDataException("Failure occurred on input");
+            // Do not throw exception here to allow the root cause exception gets propagated to the master first.
+            // Return false to allow the nextFrame(...) call to be a non-op.
+            return false;
         }
         if (availableFrames <= 0 && eos) {
             return false;
@@ -67,12 +69,13 @@
     }
 
     /**
-     * This implementation works under the truth that one Channel is never shared by two readers.
+     * This implementation works under the truth that one Channel is neverNonDeterministicChannelReader shared by two readers.
      * More precisely, one channel only has exact one reader and one writer side.
      *
-     * @param frame outputFrame
+     * @param frame
+     *            outputFrame
      * @return {@code true} if succeed to read the data from the channel to the {@code frame}.
-     * Otherwise return {@code false} if the end of stream is reached.
+     *         Otherwise return {@code false} if the end of stream is reached.
      * @throws HyracksDataException
      */
     @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 6dc8d9a..54f1016 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -106,7 +106,9 @@
                 return lastReadSender;
             }
             if (!failSenders.isEmpty()) {
-                throw new HyracksDataException("Failure occurred on input");
+                // Do not throw exception here to allow the root cause exception gets propagated to the master first.
+                // Return a negative value to allow the nextFrame(...) call to be a non-op.
+                return -1;
             }
             for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
                 channels[i].close();
@@ -127,8 +129,8 @@
     }
 
     public synchronized void close() throws HyracksDataException {
-        for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
-                .nextClearBit(i + 1)) {
+        for (int i = closedSenders.nextClearBit(0); i >= 0
+                && i < nSenderPartitions; i = closedSenders.nextClearBit(i + 1)) {
             if (channels[i] != null) {
                 channels[i].close();
                 channels[i] = null;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 584418c..fff3d57 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -70,9 +70,9 @@
             int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFileReference());
             statsGatherer = new TreeIndexStatsGatherer(bufferCache, treeIndex.getFreePageManager(), indexFileId,
                     treeIndex.getRootPageId());
-            TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(), treeIndex
-                    .getInteriorFrameFactory().createFrame(), treeIndex.getFreePageManager().getMetaDataFrameFactory()
-                    .createFrame());
+            TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(),
+                    treeIndex.getInteriorFrameFactory().createFrame(),
+                    treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame());
             // Write the stats output as a single string field.
             FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
@@ -81,13 +81,13 @@
             utf8SerDer.serialize(stats.toString(), dos);
             tb.addFieldEndOffset();
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new HyracksDataException(
-                        "Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity()
-                                + ")");
+                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+                        + appender.getBuffer().capacity() + ")");
             }
             appender.flush(writer, false);
         } catch (Exception e) {
             writer.fail();
+            throw new HyracksDataException(e);
         } finally {
             writer.close();
             treeIndexHelper.close();