Fixed protocol to call fail() on writers when source operators encounter an error.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@550 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 7aa7419..26501ed 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -430,9 +430,11 @@
                 try {
                     readMapOp.mapInput();
                 } catch (Exception e) {
+                    writer.fail();
                     throw new HyracksDataException(e);
+                } finally {
+                    readMapOp.close();
                 }
-                readMapOp.close();
             }
         };
     }
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
index 52f0c8a..b84d576 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReadOperatorDescriptor.java
@@ -194,26 +194,33 @@
                             (Class<? extends Writable>) hadoopRecordReader.createValue().getClass());
                     int nFields = outputRecordDescriptor.getFields().length;
                     ArrayTupleBuilder tb = new ArrayTupleBuilder(nFields);
-                    while (hadoopRecordReader.next(key, value)) {
-                        tb.reset();
-                        switch (nFields) {
-                            case 2:
-                                tb.addField(outputRecordDescriptor.getFields()[0], key);
-                            case 1:
-                                tb.addField(outputRecordDescriptor.getFields()[1], value);
-                        }
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(outBuffer, writer);
-                            appender.reset(outBuffer, true);
+                    writer.open();
+                    try {
+                        while (hadoopRecordReader.next(key, value)) {
+                            tb.reset();
+                            switch (nFields) {
+                                case 2:
+                                    tb.addField(outputRecordDescriptor.getFields()[0], key);
+                                case 1:
+                                    tb.addField(outputRecordDescriptor.getFields()[1], value);
+                            }
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new IllegalStateException();
+                                FrameUtils.flushFrame(outBuffer, writer);
+                                appender.reset(outBuffer, true);
+                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                    throw new IllegalStateException();
+                                }
                             }
                         }
+                        if (appender.getTupleCount() > 0) {
+                            FrameUtils.flushFrame(outBuffer, writer);
+                        }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
                     }
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(outBuffer, writer);
-                    }
-                    writer.close();
                     hadoopRecordReader.close();
                 } catch (InstantiationException e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index 1d580b1..57fa156 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -59,9 +59,13 @@
                     try {
                         in = new FileInputStream(f);
                     } catch (FileNotFoundException e) {
+                        writer.fail();
                         throw new HyracksDataException(e);
                     }
                     tp.parse(in, writer);
+                } catch (Exception e) {
+                    writer.fail();
+                    throw new HyracksDataException(e);
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 085efd9..692a55a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -300,6 +300,9 @@
                             long end = System.currentTimeMillis();
                             System.out.println("merge time " + (end - start));
                         }
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
                     } finally {
                         writer.close();
                     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 66ecd85..a01c80e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -127,8 +127,14 @@
                 public void initialize() throws HyracksDataException {
                     GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
                     writer.open();
-                    table.write(writer);
-                    writer.close();
+                    try {
+                        table.write(writer);
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
+                    }
                     env.set(HASHTABLE, null);
                 }
             };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 4b2986a..d530560 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -265,43 +265,50 @@
 
                     writer.open();// open for probe
 
-                    ByteBuffer buffer = ctx.allocateFrame();// input
-                    // buffer
-                    int tableSize = (int) (numPartitions * recordsPerFrame * factor);
-                    for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                        RunFileWriter buildWriter = buildWriters[partitionid];
-                        RunFileWriter probeWriter = probeWriters[partitionid];
-                        if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                            continue;
-                        }
-                        joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                                hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+                    try {
 
-                        // build
-                        if (buildWriter != null) {
-                            RunFileReader buildReader = buildWriter.createReader();
-                            buildReader.open();
-                            while (buildReader.nextFrame(buffer)) {
-                                ByteBuffer copyBuffer = ctx.allocateFrame();
-                                FrameUtils.copy(buffer, copyBuffer);
-                                joiner.build(copyBuffer);
+                        ByteBuffer buffer = ctx.allocateFrame();// input
+                        // buffer
+                        int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+                        for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+                                continue;
+                            }
+                            joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
+                                    rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
+                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
+
+                            // build
+                            if (buildWriter != null) {
+                                RunFileReader buildReader = buildWriter.createReader();
+                                buildReader.open();
+                                while (buildReader.nextFrame(buffer)) {
+                                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                                    FrameUtils.copy(buffer, copyBuffer);
+                                    joiner.build(copyBuffer);
+                                    buffer.clear();
+                                }
+                                buildReader.close();
+                            }
+
+                            // probe
+                            RunFileReader probeReader = probeWriter.createReader();
+                            probeReader.open();
+                            while (probeReader.nextFrame(buffer)) {
+                                joiner.join(buffer, writer);
                                 buffer.clear();
                             }
-                            buildReader.close();
+                            probeReader.close();
+                            joiner.closeJoin(writer);
                         }
-
-                        // probe
-                        RunFileReader probeReader = probeWriter.createReader();
-                        probeReader.open();
-                        while (probeReader.nextFrame(buffer)) {
-                            joiner.join(buffer, writer);
-                            buffer.clear();
-                        }
-                        probeReader.close();
-                        joiner.closeJoin(writer);
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
                     }
-                    writer.close();
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 34a8e5e..ab9b44c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -47,7 +47,13 @@
         if (fieldSlots != null && tupleData != null && tupleSize > 0)
             appender.append(fieldSlots, tupleData, 0, tupleSize);
         writer.open();
-        FrameUtils.flushFrame(writeBuffer, writer);
-        writer.close();
+        try {
+            FrameUtils.flushFrame(writeBuffer, writer);
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index deeff38..448cd8a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -112,14 +112,20 @@
                     RunFileWriter out = (RunFileWriter) env.get(MATERIALIZED_FILE);
                     RunFileReader in = out.createReader();
                     writer.open();
-                    in.open();
-                    while (in.nextFrame(frame)) {
-                        frame.flip();
-                        writer.nextFrame(frame);
-                        frame.clear();
+                    try {
+                        in.open();
+                        while (in.nextFrame(frame)) {
+                            frame.flip();
+                            writer.nextFrame(frame);
+                            frame.clear();
+                        }
+                        in.close();
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
                     }
-                    in.close();
-                    writer.close();
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 8b8429c..647d750 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -79,6 +79,9 @@
                     }
                 }
             }
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
         } finally {
             writer.close();
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 0882898..75929ce 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -120,9 +120,15 @@
                 @Override
                 public void initialize() throws HyracksDataException {
                     writer.open();
-                    FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
-                    frameSorter.flushFrames(writer);
-                    writer.close();
+                    try {
+                        FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                        frameSorter.flushFrames(writer);
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                        writer.close();
+                    }
                     env.set(FRAMESORTER, null);
                 }
             };
diff --git a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index 357760a..1581383 100644
--- a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -96,6 +96,9 @@
                         }
                     }
                     FrameUtils.flushFrame(outputFrame, writer);
+                } catch (Exception e) {
+                    writer.fail();
+                    throw new HyracksDataException(e);
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 5b5c6e8..2703137 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -52,7 +52,7 @@
         try {
 
             treeIndexOpHelper.init();
-
+            writer.open();
             try {
                 treeIndexOpHelper.getTreeIndex().diskOrderScan(cursor, cursorFrame, metaFrame, diskOrderScanOpCtx);
 
@@ -86,6 +86,9 @@
                 if (appender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(frame, writer);
                 }
+            } catch (Exception e) {
+                writer.fail();
+                throw new HyracksDataException(e);
             } finally {
                 cursor.close();
                 writer.close();