ASTERIXDB-1145: Fix error propagating in operators/connectors:
1. When an AbstractUnarySourceOperator instance runs into an exception, it should call writer.fail() first and then throw the exception.
2. An IFrameWriter.fail() implementation should not throw yet-another exception, instead, it should just propgate
the failure to its downstream operators and optionally set a "failed" state so that in the close()/nextFrame() method
it can potentially behave differently from usual close()/nextFrame().
Change-Id: Ifb538155423687c4aa01a0485adeaab87f291547
Reviewed-on: https://asterix-gerrit.ics.uci.edu/491
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b64071c..8322cdc 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -108,6 +108,9 @@
try {
startOfPipeline.open();
} catch (HyracksDataException e) {
+ // Tell the downstream the job fails.
+ startOfPipeline.fail();
+ // Throws the exception.
throw e;
} finally {
startOfPipeline.close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index e20c709..cc517e3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -138,7 +138,9 @@
} else if (eos.get()) {
break;
} else if (failed.get()) {
- throw new HyracksDataException("Failure occurred on input");
+ // Sends failure notification to its downstream.
+ // It's not supposed to throw exception here because it is on the failure notification channel.
+ mpw.fail();
} else {
try {
synchronized (this) {
@@ -153,8 +155,8 @@
}
mpw.close();
channel.close();
- delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
- new MaterializedPartitionInputChannel(1, pid, manager))));
+ delegate.addPartitions(Collections
+ .singleton(new PartitionChannel(pid, new MaterializedPartitionInputChannel(1, pid, manager))));
} catch (HyracksException e) {
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 5f63546..11b7c31 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -57,7 +57,9 @@
}
}
if (failed) {
- throw new HyracksDataException("Failure occurred on input");
+ // Do not throw exception here to allow the root cause exception gets propagated to the master first.
+ // Return false to allow the nextFrame(...) call to be a non-op.
+ return false;
}
if (availableFrames <= 0 && eos) {
return false;
@@ -67,12 +69,13 @@
}
/**
- * This implementation works under the truth that one Channel is never shared by two readers.
+ * This implementation works under the truth that one Channel is neverNonDeterministicChannelReader shared by two readers.
* More precisely, one channel only has exact one reader and one writer side.
*
- * @param frame outputFrame
+ * @param frame
+ * outputFrame
* @return {@code true} if succeed to read the data from the channel to the {@code frame}.
- * Otherwise return {@code false} if the end of stream is reached.
+ * Otherwise return {@code false} if the end of stream is reached.
* @throws HyracksDataException
*/
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 6dc8d9a..54f1016 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -106,7 +106,9 @@
return lastReadSender;
}
if (!failSenders.isEmpty()) {
- throw new HyracksDataException("Failure occurred on input");
+ // Do not throw exception here to allow the root cause exception gets propagated to the master first.
+ // Return a negative value to allow the nextFrame(...) call to be a non-op.
+ return -1;
}
for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
channels[i].close();
@@ -127,8 +129,8 @@
}
public synchronized void close() throws HyracksDataException {
- for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions; i = closedSenders
- .nextClearBit(i + 1)) {
+ for (int i = closedSenders.nextClearBit(0); i >= 0
+ && i < nSenderPartitions; i = closedSenders.nextClearBit(i + 1)) {
if (channels[i] != null) {
channels[i].close();
channels[i] = null;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 584418c..fff3d57 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -70,9 +70,9 @@
int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFileReference());
statsGatherer = new TreeIndexStatsGatherer(bufferCache, treeIndex.getFreePageManager(), indexFileId,
treeIndex.getRootPageId());
- TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(), treeIndex
- .getInteriorFrameFactory().createFrame(), treeIndex.getFreePageManager().getMetaDataFrameFactory()
- .createFrame());
+ TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(),
+ treeIndex.getInteriorFrameFactory().createFrame(),
+ treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame());
// Write the stats output as a single string field.
FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
@@ -81,13 +81,13 @@
utf8SerDer.serialize(stats.toString(), dos);
tb.addFieldEndOffset();
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException(
- "Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity()
- + ")");
+ throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
}
appender.flush(writer, false);
} catch (Exception e) {
writer.fail();
+ throw new HyracksDataException(e);
} finally {
writer.close();
treeIndexHelper.close();