Fixed protocol to call fail() on writers when source operators encounter an error.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@550 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 7aa7419..26501ed 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -430,9 +430,11 @@
try {
readMapOp.mapInput();
} catch (Exception e) {
+ writer.fail();
throw new HyracksDataException(e);
+ } finally {
+ readMapOp.close();
}
- readMapOp.close();
}
};
}
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 52f0c8a..b84d576 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -194,26 +194,33 @@
(Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
int nFields = outputRecordDescriptor.getFields().length;
ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
- while (hadoopRecordReader.next(key, value)) {
- tb.reset();
- switch (nFields) {
- case 2:
- tb.addField(outputRecordDescriptor.getFields()[0], key);
- case 1:
- tb.addField(outputRecordDescriptor.getFields()[1], value);
- }
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
+ writer.open();
+ try {
+ while (hadoopRecordReader.next(key, value)) {
+ tb.reset();
+ switch (nFields) {
+ case 2:
+ tb.addField(outputRecordDescriptor.getFields()[0], key);
+ case 1:
+ tb.addField(outputRecordDescriptor.getFields()[1], value);
+ }
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
+ FrameUtils.flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
}
}
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outBuffer, writer);
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
}
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outBuffer, writer);
- }
- writer.close();
hadoopRecordReader.close();
} catch (InstantiationException e) {
throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index 1d580b1..57fa156 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -59,9 +59,13 @@
try {
in = new FileInputStream(f);
} catch (FileNotFoundException e) {
+ writer.fail();
throw new HyracksDataException(e);
}
tp.parse(in, writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
} finally {
writer.close();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 085efd9..692a55a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -300,6 +300,9 @@
long end = System.currentTimeMillis();
System.out.println("merge time " + (end - start));
}
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
} finally {
writer.close();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 66ecd85..a01c80e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -127,8 +127,14 @@
public void initialize() throws HyracksDataException {
GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
writer.open();
- table.write(writer);
- writer.close();
+ try {
+ table.write(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
env.set(HASHTABLE, null);
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 4b2986a..d530560 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -265,43 +265,50 @@
writer.open();// open for probe
- ByteBuffer buffer = ctx.allocateFrame();// input
- // buffer
- int tableSize = (int) (numPartitions * recordsPerFrame * factor);
- for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter buildWriter = buildWriters[partitionid];
- RunFileWriter probeWriter = probeWriters[partitionid];
- if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
- continue;
- }
- joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+ try {
- // build
- if (buildWriter != null) {
- RunFileReader buildReader = buildWriter.createReader();
- buildReader.open();
- while (buildReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
+ ByteBuffer buffer = ctx.allocateFrame();// input
+ // buffer
+ int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+ for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+ continue;
+ }
+ joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
+ rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+
+ // build
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
+ }
+
+ // probe
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
+ joiner.join(buffer, writer);
buffer.clear();
}
- buildReader.close();
+ probeReader.close();
+ joiner.closeJoin(writer);
}
-
- // probe
- RunFileReader probeReader = probeWriter.createReader();
- probeReader.open();
- while (probeReader.nextFrame(buffer)) {
- joiner.join(buffer, writer);
- buffer.clear();
- }
- probeReader.close();
- joiner.closeJoin(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
}
- writer.close();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 34a8e5e..ab9b44c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -47,7 +47,13 @@
if (fieldSlots != null && tupleData != null && tupleSize > 0)
appender.append(fieldSlots, tupleData, 0, tupleSize);
writer.open();
- FrameUtils.flushFrame(writeBuffer, writer);
- writer.close();
+ try {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index deeff38..448cd8a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -112,14 +112,20 @@
RunFileWriter out = (RunFileWriter) env.get(MATERIALIZED_FILE);
RunFileReader in = out.createReader();
writer.open();
- in.open();
- while (in.nextFrame(frame)) {
- frame.flip();
- writer.nextFrame(frame);
- frame.clear();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ frame.flip();
+ writer.nextFrame(frame);
+ frame.clear();
+ }
+ in.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
}
- in.close();
- writer.close();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 8b8429c..647d750 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -79,6 +79,9 @@
}
}
}
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
} finally {
writer.close();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 0882898..75929ce 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -120,9 +120,15 @@
@Override
public void initialize() throws HyracksDataException {
writer.open();
- FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
- frameSorter.flushFrames(writer);
- writer.close();
+ try {
+ FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+ frameSorter.flushFrames(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
env.set(FRAMESORTER, null);
}
};
diff --git a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index 357760a..1581383 100644
--- a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -96,6 +96,9 @@
}
}
FrameUtils.flushFrame(outputFrame, writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
} finally {
writer.close();
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 5b5c6e8..2703137 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -52,7 +52,7 @@
try {
treeIndexOpHelper.init();
-
+ writer.open();
try {
treeIndexOpHelper.getTreeIndex().diskOrderScan(cursor, cursorFrame, metaFrame, diskOrderScanOpCtx);
@@ -86,6 +86,9 @@
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(frame, writer);
}
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
} finally {
cursor.close();
writer.close();