Added fail() call to IFrameWriter

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@549 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
index dd91f28..065f4c5 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.api.channels;
 
 public interface IInputChannelMonitor {
+    public void notifyFailure(IInputChannel channel);
+
     public void notifyDataAvailability(IInputChannel channel, int nFrames);
 
     public void notifyEndOfStream(IInputChannel channel);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index 0ded474..a85bd7e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -18,12 +18,76 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
+/**
+ * {@link IFrameWriter} is the interface implemented by a stream consumer. An
+ * {@link IFrameWriter} could be in one of the following states:
+ * <ul>
+ * <li>INITIAL</li>
+ * <li>OPENED</li>
+ * <li>CLOSED</li>
+ * <li>FAILED</li>
+ * </ul>
+ * A producer follows the following protocol when using an {@link IFrameWriter}.
+ * Initially, the {@link IFrameWriter} is in the INITIAL state.
+ * The first valid call to an {@link IFrameWriter} is always the
+ * {@link IFrameWriter#open()}. This call provides the opportunity for the
+ * {@link IFrameWriter} implementation to allocate any resources for its
+ * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
+ * state. If an error occurs
+ * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException}
+ * is thrown and it stays in the INITIAL state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call
+ * one of:
+ * <ul>
+ * <li> {@link IFrameWriter#close()} to give up any resources owned by the
+ * {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the
+ * {@link IFrameWriter}. The call returns normally on success and the
+ * {@link IFrameWriter} remains in the OPENED state. On failure, the call throws
+ * a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR
+ * state.</li>
+ * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The
+ * {@link IFrameWriter} enters the FAILED state.</li>
+ * </ul>
+ * In the FAILED state, the only call allowed is the
+ * {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
+ * state and give up all resources.
+ * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
+ * 
+ * Note: If the call to {@link IFrameWriter#open()} failed, the
+ * {@link IFrameWriter#close()} is not called by the producer. So an exceptional
+ * return from the {@link IFrameWriter#open()} call must clean up all partially
+ * allocated resources.
+ * 
+ * @author vinayakb
+ */
 public interface IFrameWriter {
+    /**
+     * First call to allocate any resources.
+     */
     public void open() throws HyracksDataException;
 
+    /**
+     * Provide data to the stream of this {@link IFrameWriter}.
+     * 
+     * @param buffer
+     *            - Buffer containing data.
+     * @throws HyracksDataException
+     */
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
 
-    public void flush() throws HyracksDataException;
+    /**
+     * Indicate that a failure was encountered and the current stream is to be
+     * aborted.
+     * 
+     * @throws HyracksDataException
+     */
+    public void fail() throws HyracksDataException;
 
+    /**
+     * Close this {@link IFrameWriter} and give up all resources.
+     * 
+     * @throws HyracksDataException
+     */
     public void close() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
index 128576c..4fb6f18 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
@@ -26,12 +26,20 @@
      * Pushes data to the acceptor.
      * 
      * @param data
-     *            - Data pushed to the acceptor. <code>null</code> indicates the end of stream.
+     *            - Data pushed to the acceptor. <code>null</code> indicates the
+     *            end of stream.
      * @throws HyracksDataException
      */
     public void writeData(T data) throws HyracksDataException;
 
     /**
+     * Indicates that the stream has failed.
+     * 
+     * @throws HyracksDataException
+     */
+    public void fail() throws HyracksDataException;
+
+    /**
      * Closes this writer.
      * 
      * @throws Exception
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index e3d28ea..ef44033 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -73,6 +73,7 @@
         userConstraints = new HashSet<Constraint>();
         operatorIdCounter = 0;
         connectorIdCounter = 0;
+        maxAttempts = 5;
     }
 
     public OperatorDescriptorId createOperatorDescriptorId() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index e5d9f64..a3ca82d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -275,7 +275,7 @@
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
         }
-        return new PipeliningConnectorPolicy();
+        return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
     }
 
     private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index e65f59e..6a73902 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -73,8 +73,7 @@
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName,
-            Executor executor) {
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
@@ -82,13 +81,13 @@
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
-//        this.inputGlobalVariables = inputGlobalVariables;
+        //        this.inputGlobalVariables = inputGlobalVariables;
         inputGlobalVariables = Collections.emptyMap();
         outputVariables = new HashMap<MultipartName, Object>();
         outputVariableDescriptorMap = new HashMap<MultipartName, WorkflowVariableDescriptor>();
-//        for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
-//            outputVariableDescriptorMap.put(wvd.getName(), wvd);
-//        }
+        //        for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
+        //            outputVariableDescriptorMap.put(wvd.getName(), wvd);
+        //        }
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -270,19 +269,19 @@
                 reader.open();
                 try {
                     writer.open();
-                    try {
-                        ByteBuffer buffer = allocateFrame();
-                        while (reader.nextFrame(buffer)) {
-                            if (aborted) {
-                                return;
-                            }
-                            buffer.flip();
-                            writer.nextFrame(buffer);
-                            buffer.compact();
+                    ByteBuffer buffer = allocateFrame();
+                    while (reader.nextFrame(buffer)) {
+                        if (aborted) {
+                            return;
                         }
-                    } finally {
-                        writer.close();
+                        buffer.flip();
+                        writer.nextFrame(buffer);
+                        buffer.compact();
                     }
+                    writer.close();
+                } catch (Exception e) {
+                    writer.fail();
+                    throw e;
                 } finally {
                     reader.close();
                 }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 74542b3..31ce924 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -42,6 +42,8 @@
 
     private boolean eosSent;
 
+    private boolean failed;
+
     private ByteBuffer currentBuffer;
 
     public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
@@ -55,7 +57,7 @@
 
     @Override
     public synchronized boolean dispatchNetworkEvent() throws IOException {
-        if (aborted) {
+        if (failed || aborted) {
             eos = true;
             return true;
         } else if (key.isWritable()) {
@@ -154,15 +156,15 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-
+    public void fail() throws HyracksDataException {
+        failed = true;
     }
 
     @Override
     public synchronized void close() throws HyracksDataException {
         eos = true;
         key.interestOps(SelectionKey.OP_WRITE);
-        key.selector().wakeup();        
+        key.selector().wakeup();
     }
 
     @Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 971f2b6..e579da7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -120,7 +120,7 @@
         }
 
         @Override
-        public void flush() throws HyracksDataException {
+        public void fail() throws HyracksDataException {
 
         }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index db2e405..b989805 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -16,6 +16,8 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -29,6 +31,8 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 
 public class MaterializedPartitionWriter implements IFrameWriter {
+    private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
+
     protected final IHyracksRootContext ctx;
 
     protected final PartitionManager manager;
@@ -45,6 +49,8 @@
 
     private long size;
 
+    private boolean failed;
+
     public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
             TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
@@ -56,10 +62,14 @@
 
     @Override
     public void open() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("open(" + pid + " by " + taId);
+        }
         fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
         handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
+        failed = false;
     }
 
     @Override
@@ -68,14 +78,20 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
+        failed = true;
     }
 
     @Override
     public void close() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("close(" + pid + " by " + taId);
+        }
         ctx.getIOManager().close(handle);
-        manager.registerPartition(pid, taId,
-                new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
-                PartitionState.COMMITTED);
+        if (!failed) {
+            manager.registerPartition(pid, taId,
+                    new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+                    PartitionState.COMMITTED);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 1a9fb53..9403736 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -32,6 +32,8 @@
 
     private IFrameWriter delegate;
 
+    private boolean failed;
+
     public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
         this.manager = manager;
         this.pid = pid;
@@ -57,6 +59,7 @@
     @Override
     public synchronized void open() throws HyracksDataException {
         manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+        failed = false;
         while (delegate == null) {
             try {
                 wait();
@@ -73,13 +76,16 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-        delegate.flush();
+    public void fail() throws HyracksDataException {
+        failed = true;
+        delegate.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+        if (!failed) {
+            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+        }
         delegate.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index c108bef..df76604 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -86,6 +87,8 @@
 
         private boolean eos;
 
+        private boolean failed;
+
         public PartitionWriter(PartitionChannel pc) {
             this.pc = pc;
             nAvailableFrames = 0;
@@ -93,6 +96,12 @@
         }
 
         @Override
+        public synchronized void notifyFailure(IInputChannel channel) {
+            failed = true;
+            notifyAll();
+        }
+
+        @Override
         public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
             nAvailableFrames += nFrames;
             notifyAll();
@@ -121,6 +130,8 @@
                         channel.recycleBuffer(buffer);
                     } else if (eos) {
                         break;
+                    } else if (failed) {
+                        throw new HyracksDataException("Failure occurred on input");
                     } else {
                         try {
                             wait();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index c64e187..5d592be 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-        writer.flush();
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 96a8102..2ec4b47 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -52,6 +52,6 @@
     }
 
     @Override
-    public void flush() {
+    public void fail() {
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 7c7979b..e8f43cd 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -98,4 +98,9 @@
         buffer.limit(buffer.capacity());
         frameWriter.nextFrame(buffer);
     }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        frameWriter.fail();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
index 8d1ec1d..6c5a957 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
@@ -25,6 +25,7 @@
 public class RunFileWriter implements IFrameWriter {
     private final FileReference file;
     private final IIOManager ioManager;
+    private boolean failed;
 
     private FileHandle handle;
     private long size;
@@ -39,6 +40,13 @@
         handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
+        failed = false;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        ioManager.close(handle);
+        failed = true;
     }
 
     @Override
@@ -48,7 +56,9 @@
 
     @Override
     public void close() throws HyracksDataException {
-        ioManager.close(handle);
+        if (!failed) {
+            ioManager.close(handle);
+        }
     }
 
     public FileReference getFileReference() {
@@ -59,11 +69,10 @@
         return size;
     }
 
-    @Override
-    public void flush() throws HyracksDataException {
-    }
-
-    public RunFileReader createReader() {
+    public RunFileReader createReader() throws HyracksDataException {
+        if (failed) {
+            throw new HyracksDataException("createReader() called on a failed RunFileWriter");
+        }
         return new RunFileReader(file, ioManager, size);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 14c034a..7aa7419 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -56,7 +56,7 @@
         protected OutputCollector<K2, V2> output;
         protected Reporter reporter;
         protected Object mapper;
-        //protected Mapper<K1, V1, K2, V2> mapper;
+        // protected Mapper<K1, V1, K2, V2> mapper;
         protected int partition;
         protected JobConf conf;
         protected IOpenableDataWriter<Object[]> writer;
@@ -96,7 +96,8 @@
                 if (!conf.getUseNewMapper()) {
                     ((org.apache.hadoop.mapred.Mapper) mapper).close();
                 } else {
-                    // do nothing. closing the mapper is handled internally by run method on context. 
+                    // do nothing. closing the mapper is handled internally by
+                    // run method on context.
                 }
             } catch (IOException ioe) {
                 throw new HyracksDataException(ioe);
@@ -118,6 +119,11 @@
         }
 
         @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
         public void open() throws HyracksDataException {
             initializeMapper();
             writer.open();
@@ -172,16 +178,18 @@
                 } else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
                     conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
                             .toString());
-                    conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
-                            .getStart());
-                    conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
-                            .getLength());
+                    conf.setLong("map.input.start",
+                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+                    conf.setLong("map.input.length",
+                            ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
-                // we do not throw the exception here as we are setting additional parameters that may not be 
-                // required by the mapper. If they are  indeed required,  the configure method invoked on the mapper
-                // shall report an exception because of the missing parameters. 
+                // we do not throw the exception here as we are setting
+                // additional parameters that may not be
+                // required by the mapper. If they are indeed required, the
+                // configure method invoked on the mapper
+                // shall report an exception because of the missing parameters.
             }
         }
 
@@ -223,7 +231,9 @@
                             data[1] = value;
                             writer.writeData(data);
                         }
-                    };;;
+                    };
+                    ;
+                    ;
 
                     OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
                             .getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -245,7 +255,9 @@
                         public Counter getCounter(Enum<?> arg0) {
                             return null;
                         }
-                    };;;
+                    };
+                    ;
+                    ;
                     context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
                             newReader, recordWriter, outputCommitter, statusReporter,
                             (org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -315,9 +327,9 @@
         String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
         try {
             if (hadoopClassFactory == null) {
-                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
-                        .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
-                        .forName(mapOutputValueClassName));
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
             } else {
                 recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
                         (Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -361,8 +373,8 @@
         } else {
             Class inputFormatClass = conf.getInputFormat().getClass();
             InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
-            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
-                    .createReporter());
+            return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
+                    super.createReporter());
         }
     }
 
@@ -400,8 +412,8 @@
                 }
                 return createSelfReadingMapper(ctx, recordDescriptor, partition);
             } else {
-                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
-                        .getInputRecordDescriptor(this.odId, 0));
+                return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
+                        recordDescProvider.getInputRecordDescriptor(this.odId, 0));
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index 5ff528e..33819a8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -45,6 +45,8 @@
 
     private final BitSet eosSenders;
 
+    private final BitSet failSenders;
+
     private BitSet closedSenders;
 
     private int lastReadSender;
@@ -57,6 +59,7 @@
         reader = new FrameReader();
         channels = new IInputChannel[nSenderPartitions];
         eosSenders = new BitSet(nSenderPartitions);
+        failSenders = new BitSet(nSenderPartitions);
         closedSenders = new BitSet(nSenderPartitions);
         closedSenders.or(expectedPartitions);
         closedSenders.flip(0, nSenderPartitions);
@@ -128,6 +131,9 @@
                         }
                         return;
                     }
+                    if (!failSenders.isEmpty()) {
+                        throw new HyracksDataException("Failure occurred on input");
+                    }
                     for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
                         channels[i].close();
                         eosSenders.clear(i);
@@ -161,6 +167,16 @@
         }
 
         @Override
+        public void notifyFailure(IInputChannel channel) {
+            synchronized (NonDeterministicPartitionCollector.this) {
+                PartitionId pid = (PartitionId) channel.getAttachment();
+                int senderIndex = pid.getSenderIndex();
+                failSenders.set(senderIndex);
+                NonDeterministicPartitionCollector.this.notifyAll();
+            }
+        }
+
+        @Override
         public void notifyDataAvailability(IInputChannel channel, int nFrames) {
             synchronized (NonDeterministicPartitionCollector.this) {
                 PartitionId pid = (PartitionId) channel.getAttachment();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
index 8faee7b..96b2a69 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -178,10 +178,13 @@
 
         private boolean eos;
 
+        private boolean failed;
+
         public InputChannelFrameReader(IInputChannel channel) {
             this.channel = channel;
             availableFrames = 0;
             eos = false;
+            failed = false;
         }
 
         @Override
@@ -191,14 +194,17 @@
         @Override
         public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
             synchronized (this) {
-                while (!eos && availableFrames <= 0) {
+                while (!failed && !eos && availableFrames <= 0) {
                     try {
                         wait();
                     } catch (InterruptedException e) {
                         throw new HyracksDataException(e);
                     }
                 }
-                if (availableFrames <=0 && eos) {
+                if (failed) {
+                    throw new HyracksDataException("Failure occurred on input");
+                }
+                if (availableFrames <= 0 && eos) {
                     return false;
                 }
                 --availableFrames;
@@ -215,6 +221,12 @@
         }
 
         @Override
+        public synchronized void notifyFailure(IInputChannel channel) {
+            failed = true;
+            notifyAll();
+        }
+
+        @Override
         public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
             availableFrames += nFrames;
             notifyAll();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 5297a99..ba66ebc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -55,6 +55,13 @@
             }
 
             @Override
+            public void fail() throws HyracksDataException {
+                for (int i = 0; i < epWriters.length; ++i) {
+                    epWriters[i].fail();
+                }
+            }
+
+            @Override
             public void close() throws HyracksDataException {
                 for (int i = 0; i < epWriters.length; ++i) {
                     epWriters[i].close();
@@ -67,13 +74,6 @@
                     epWriters[i].open();
                 }
             }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                for (int i = 0; i < epWriters.length; ++i) {
-                    epWriters[i].flush();
-                }
-            }
         };
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 7054535..6b3f3c0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -94,16 +94,9 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
         for (int i = 0; i < appenders.length; ++i) {
-            FrameTupleAppender appender = appenders[i];
-            if (appender.getTupleCount() > 0) {
-                ByteBuffer buffer = appender.getBuffer();
-                IFrameWriter frameWriter = pWriters[i];
-                flushFrame(buffer, frameWriter);
-                pWriters[i].flush();
-                appender.reset(buffer, true);
-            }
+            pWriters[i].fail();
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 03438d3..f6511b4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -64,28 +64,31 @@
         public void open() throws HyracksDataException {
             FileSplit split = splits[index];
             RecordDescriptor desc = recordDescriptors[0];
+            IRecordReader reader;
             try {
-                IRecordReader reader = createRecordReader(split.getLocalFile().getFile(), desc);
-                if (desc == null) {
-                    desc = recordDescriptors[0];
-                }
-                writer.open();
-                try {
-                    while (true) {
-                        Object[] record = new Object[desc.getFields().length];
-                        if (!reader.read(record)) {
-                            break;
-                        }
-                        writer.writeData(record);
-                    }
-                } finally {
-                    reader.close();
-                    writer.close();
-                }
+                reader = createRecordReader(split.getLocalFile().getFile(), desc);
             } catch (Exception e) {
                 throw new HyracksDataException(e);
             }
-
+            if (desc == null) {
+                desc = recordDescriptors[0];
+            }
+            writer.open();
+            try {
+                while (true) {
+                    Object[] record = new Object[desc.getFields().length];
+                    if (!reader.read(record)) {
+                        break;
+                    }
+                    writer.writeData(record);
+                }
+            } catch (Exception e) {
+                writer.fail();
+                throw new HyracksDataException(e);
+            } finally {
+                reader.close();
+                writer.close();
+            }
         }
 
         @Override
@@ -97,6 +100,11 @@
         public void writeData(Object[] data) throws HyracksDataException {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            // do nothing
+        }
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index ae1a722..4bb4eb9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -55,6 +55,10 @@
         }
 
         @Override
+        public void fail() throws HyracksDataException {
+        }
+
+        @Override
         public void writeData(Object[] data) throws HyracksDataException {
             try {
                 writer.write(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index 21e197d..24d76aa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -65,12 +65,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
-                try {
-                    out.flush();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
+            public void fail() throws HyracksDataException {
             }
 
             @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index cd320dd1..42c86ca 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -114,12 +114,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
-                try {
-                    out.flush();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
+            public void fail() throws HyracksDataException {
             }
 
             @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index fc5f282..4ac72f6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -90,6 +90,11 @@
     }
 
     @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
     public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
         if (index != 0) {
             throw new IllegalArgumentException();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 73de7f5..085efd9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -163,7 +163,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
 
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index cf954f6..66ecd85 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -106,7 +106,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index b5568e6..a865456 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -78,8 +78,8 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
-                pgw.flush();
+            public void fail() throws HyracksDataException {
+                pgw.fail();
             }
 
             @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 60a1dc3..df7f9f9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -112,9 +112,8 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-        FrameUtils.flushFrame(appender.getBuffer(), writer);
-        appender.reset(appender.getBuffer(), true);
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index c60c0af..4b2986a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -216,7 +216,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 7c9134f..720ea99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -295,7 +295,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
@@ -501,11 +501,6 @@
                     env.set(NUM_PARTITION, null);
                 }
 
-                @Override
-                public void flush() throws HyracksDataException {
-                    writer.flush();
-                }
-
                 private void closeWriter(int i) throws HyracksDataException {
                     RunFileWriter writer = probeWriters[i];
                     if (writer != null) {
@@ -523,6 +518,11 @@
                     }
                     writer.nextFrame(head);
                 }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
             };
             return op;
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ce8be1c..5a28d1c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -147,7 +147,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
             return op;
@@ -186,8 +186,8 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
-                    writer.flush();
+                public void fail() throws HyracksDataException {
+                    writer.fail();
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 9e69e4b..6cabbe6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -103,7 +103,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
             return op;
@@ -143,8 +143,8 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
-                    writer.flush();
+                public void fail() throws HyracksDataException {
+                    writer.fail();
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index a142c94..3ed7c07 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -33,11 +33,15 @@
 
         @Override
         public void close() throws HyracksDataException {
-            // writer.writeData(null);
             writer.close();
         }
 
         @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+
+        @Override
         public void open() throws HyracksDataException {
             mapper = mapperFactory.createMapper();
             writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index e8a6eae..deeff38 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -89,7 +89,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 5a4ea37..80b2212 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -49,7 +49,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
+            public void fail() throws HyracksDataException {
             }
         };
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 2177409..d937784 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -43,6 +43,10 @@
         }
 
         @Override
+        public void fail() throws HyracksDataException {
+        }
+
+        @Override
         public void writeData(Object[] data) throws HyracksDataException {
             for (int i = 0; i < data.length; ++i) {
                 System.err.print(StringSerializationUtils.toString(data[i]));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index ff39cc8..8c83e3a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -39,7 +39,10 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
             }
 
             @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 8a3addb..a06bb45 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -71,6 +71,11 @@
                 public void writeData(Object[] data) throws HyracksDataException {
                     buffer.add(data);
                 }
+
+                @Override
+                public void fail() throws HyracksDataException {
+
+                }
             };
             return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
                     getOperatorId(), 0));
@@ -117,6 +122,11 @@
                     }
                     writer.close();
                 }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
             };
             return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
                     getOperatorId(), 0));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 3bde4c4..55659a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -109,8 +109,8 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
-                    runGen.flush();
+                public void fail() throws HyracksDataException {
+                    runGen.fail();
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index eee6f49..bcfdb89 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -84,7 +84,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
     }
 
     public FrameSorter getFrameSorter() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index d3015b7..0882898 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -99,7 +99,7 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index f5ecde6..373ed48 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -69,6 +69,8 @@
 
         private int nClosed;
 
+        private boolean failed;
+
         public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
             nOpened = 0;
             nClosed = 0;
@@ -99,9 +101,12 @@
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                     synchronized (UnionOperator.this) {
-                        writer.flush();
+                        if (failed) {
+                            writer.fail();
+                        }
+                        failed = true;
                     }
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index b2c6feb..3345dd4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -63,7 +63,8 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
+        delegate.fail();
     }
 
     @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java
deleted file mode 100644
index a8ac4c2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import java.util.Collection;
-
-public class SynchronizedBoundedBuffer<T> {
-    private static final int QUEUE_SIZE = 8192;
-    private Object[] buffer;
-    private int head;
-    private int tail;
-
-    public SynchronizedBoundedBuffer() {
-        buffer = new Object[QUEUE_SIZE];
-        head = 0;
-        tail = 0;
-    }
-
-    public synchronized void put(T o) throws InterruptedException {
-        while (full()) {
-            wait();
-        }
-        buffer[tail] = o;
-        tail = (tail + 1) % QUEUE_SIZE;
-        notifyAll();
-    }
-
-    public synchronized void putAll(Collection<? extends T> c) throws InterruptedException {
-        for (T o : c) {
-            while (full()) {
-                wait();
-            }
-            buffer[tail] = o;
-            tail = (tail + 1) % QUEUE_SIZE;
-        }
-        notifyAll();
-    }
-
-    public synchronized T get() throws InterruptedException {
-        while (empty()) {
-            wait();
-        }
-        T o = (T) buffer[head];
-        head = (head + 1) % QUEUE_SIZE;
-        notifyAll();
-        return o;
-    }
-
-    private boolean empty() {
-        return head == tail;
-    }
-
-    private boolean full() {
-        return (tail + 1) % QUEUE_SIZE == head;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java
deleted file mode 100644
index b218024..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SynchronizedBoundedBufferDataReader implements IOpenableDataReader<Object[]> {
-    private SynchronizedBoundedBuffer<Object[]> queue;
-
-    public SynchronizedBoundedBufferDataReader(SynchronizedBoundedBuffer<Object[]> queue) {
-        this.queue = queue;
-    }
-
-    @Override
-    public Object[] readData() throws HyracksDataException {
-        try {
-            return queue.get();
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        queue = null;
-    }
-
-    @Override
-    public void open() {
-        // do nothing
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java
deleted file mode 100644
index 05d3360..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SynchronizedBoundedBufferDataWriter implements IOpenableDataWriter<Object[]> {
-    private SynchronizedBoundedBuffer<Object[]> queue;
-
-    public SynchronizedBoundedBufferDataWriter(SynchronizedBoundedBuffer<Object[]> queue) {
-        this.queue = queue;
-    }
-
-    @Override
-    public void writeData(Object[] data) throws HyracksDataException {
-        try {
-            queue.put(data);
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            queue.put(null);
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
-        }
-        queue = null;
-    }
-
-    @Override
-    public void open() {
-        // do nothing
-    }
-}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 99204d8..8a78e49 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -68,11 +68,10 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-
                 }
 
                 @Override
-                public void flush() throws HyracksDataException {
+                public void fail() throws HyracksDataException {
                 }
             });
         }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 8a7e59e..f53bfd6 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -206,9 +206,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(writeBuffer, writer);
-        }
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index 817aeb4..aced388 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -84,6 +84,6 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 496678b..5779805 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -116,6 +116,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 1a0091a..0af27d6 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -120,6 +120,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index d2bd395..9d421b0 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -106,6 +106,6 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void fail() throws HyracksDataException {
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index 2afd6d8..69e48c0 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -175,9 +175,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(writeBuffer, writer);
-        }
+    public void fail() throws HyracksDataException {
+        writer.fail();
     }
 }
\ No newline at end of file