Changed the IFrameWriter Contract

Updated existing operators and added a test case for BTreeSearchOperatorNodePushable.
With this change, calling the open method itself moves it to the open state and
hence, close must be called.

Change-Id: I03da090002f79f4db7b5b31454ce3ac2b9e40c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/551
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 8759f1b..bafe8a7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -69,6 +69,7 @@
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
 
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -86,7 +87,7 @@
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
                 }
-
+                isOpen = true;
                 writer.open();
             }
 
@@ -103,9 +104,14 @@
 
             @Override
             public void close() throws HyracksDataException {
-                computeAggregate();
-                appendToFrameFromTupleBuilder(tupleBuilder);
-                super.close();
+                if (isOpen) {
+                    try {
+                        computeAggregate();
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    } finally {
+                        super.close();
+                    }
+                }
             }
 
             private void computeAggregate() throws HyracksDataException {
@@ -132,7 +138,9 @@
 
             @Override
             public void fail() throws HyracksDataException {
-                writer.fail();
+                if (isOpen) {
+                    writer.fail();
+                }
             }
         };
     }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 7fd1d05..e57a4ba 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        flushIfNotFailed();
-        writer.close();
+        try {
+            flushIfNotFailed();
+        } finally {
+            writer.close();
+        }
     }
 
     protected void flushAndReset() throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index f10151e..25eb229 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -116,6 +116,7 @@
                         appendNullsToTuple();
                         appendToFrameFromTupleBuilder(tb);
                     }
+
                 }
 
                 @Override
@@ -146,11 +147,11 @@
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (first) {
                     first = false;
                     initAccessAppendRef(ctx);
                 }
-                writer.open();
             }
 
             @Override
@@ -164,6 +165,7 @@
                     startOfPipeline.close();
                 }
             }
+
         };
     }
 }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 291b92d..bca301f 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -31,8 +31,8 @@
 import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 
 public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -64,15 +64,14 @@
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (frameSorter == null) {
                     IFrameBufferManager manager = new VariableFrameMemoryManager(
-                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
-                            new FrameFreeSlotLastFit());
+                            new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
                     frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
                             comparatorFactories, outputRecordDesc);
                 }
                 frameSorter.reset();
-                writer.open();
             }
 
             @Override
@@ -87,9 +86,12 @@
 
             @Override
             public void close() throws HyracksDataException {
-                frameSorter.sort();
-                frameSorter.flush(writer);
-                writer.close();
+                try {
+                    frameSorter.sort();
+                    frameSorter.flush(writer);
+                } finally {
+                    writer.close();
+                }
             }
         };
     }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 9c8a35c..79d9b7d1 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,6 +96,7 @@
             private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
             private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -111,10 +112,18 @@
                         }
                     }
                 }
+                isOpen = true;
                 writer.open();
             }
 
             @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    super.close();
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
@@ -158,7 +167,9 @@
 
             @Override
             public void fail() throws HyracksDataException {
-                writer.fail();
+                if (isOpen) {
+                    writer.fail();
+                }
             }
         };
     }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 83d0c61..3d1eb06 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -66,9 +66,10 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new AbstractUnaryInputOperatorNodePushable() {
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+            private final boolean[] isOpen = new boolean[outputArity];
             private final IFrame[] writeBuffers = new IFrame[outputArity];
             private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
             private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
@@ -83,19 +84,52 @@
 
             @Override
             public void close() throws HyracksDataException {
-                // Flush (possibly not full) buffers that have data, and close writers.
+                HyracksDataException hde = null;
                 for (int i = 0; i < outputArity; i++) {
-                    tupleAppender.reset(writeBuffers[i], false);
-                    // ? by JF why didn't clear the buffer ?
-                    tupleAppender.flush(writers[i], false);
-                    writers[i].close();
+                    if (isOpen[i]) {
+                        try {
+                            tupleAppender.reset(writeBuffers[i], false);
+                            // ? by JF why didn't clear the buffer ?
+                            tupleAppender.flush(writers[i], false);
+                        } catch (Throwable th) {
+                            if (hde == null) {
+                                hde = new HyracksDataException();
+                            }
+                            hde.addSuppressed(th);
+                        } finally {
+                            try {
+                                writers[i].close();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException();
+                                }
+                                hde.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (hde != null) {
+                    throw hde;
                 }
             }
 
             @Override
             public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
+                HyracksDataException hde = null;
+                for (int i = 0; i < outputArity; i++) {
+                    if (isOpen[i]) {
+                        try {
+                            writers[i].fail();
+                        } catch (Throwable th) {
+                            if (hde == null) {
+                                hde = new HyracksDataException();
+                            }
+                            hde.addSuppressed(th);
+                        }
+                    }
+                }
+                if (hde != null) {
+                    throw hde;
                 }
             }
 
@@ -144,8 +178,9 @@
 
             @Override
             public void open() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.open();
+                for (int i = 0; i < writers.length; i++) {
+                    isOpen[i] = true;
+                    writers[i].open();
                 }
                 // Create write buffers.
                 for (int i = 0; i < outputArity; i++) {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 34bd1e0..8a5f38c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,6 +90,7 @@
             private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
             private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
             private boolean first = true;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -112,10 +113,25 @@
                         throw new HyracksDataException(ae);
                     }
                 }
+                isOpen = true;
                 writer.open();
             }
 
             @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    super.close();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                if (isOpen) {
+                    super.fail();
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 5605485..ef172c7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -73,7 +73,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                // if (first) {
+                writer.open();
                 if (evalMaxObjects == null) {
                     initAccessAppendRef(ctx);
                     try {
@@ -85,14 +85,12 @@
                         throw new HyracksDataException(ae);
                     }
                 }
-                writer.open();
                 afterLastTuple = false;
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 if (afterLastTuple) {
-                    // ignore the data
                     return;
                 }
                 tAccess.reset(buffer);
@@ -123,7 +121,6 @@
                             appendTupleToFrame(t);
                         }
                     } else {
-                        // close();
                         afterLastTuple = true;
                         break;
                     }
@@ -136,9 +133,7 @@
                 toSkip = 0; // how many tuples still to skip
                 firstTuple = true;
                 afterLastTuple = false;
-                // if (!afterLastTuple) {
                 super.close();
-                // }
             }
 
             private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
@@ -154,5 +149,4 @@
 
         };
     }
-
 }
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 34754b4..2cea90d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -56,11 +56,11 @@
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 if (first) {
                     first = false;
                     initAccessAppend(ctx);
                 }
-                writer.open();
             }
 
             @Override
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 416c398..75c3d08 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -53,7 +53,8 @@
 
     /**
      * @param cond
-     * @param projectionList               if projectionList is null, then no projection is performed
+     * @param projectionList
+     *            if projectionList is null, then no projection is performed
      * @param retainNull
      * @param nullPlaceholderVariableIndex
      * @param nullWriterFactory
@@ -83,6 +84,7 @@
             private IScalarEvaluator eval;
             private INullWriter nullWriter = null;
             private ArrayTupleBuilder nullTupleBuilder = null;
+            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -94,6 +96,7 @@
                         throw new HyracksDataException(ae);
                     }
                 }
+                isOpen = true;
                 writer.open();
 
                 //prepare nullTupleBuilder
@@ -107,6 +110,24 @@
             }
 
             @Override
+            public void fail() throws HyracksDataException {
+                if (isOpen) {
+                    super.fail();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (isOpen) {
+                    try {
+                        flushIfNotFailed();
+                    } finally {
+                        writer.close();
+                    }
+                }
+            }
+
+            @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 8850436..6b21cda 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -36,7 +36,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -96,6 +95,7 @@
 
             @Override
             public void open() throws HyracksDataException {
+                writer.open();
                 initAccessAppendRef(ctx);
                 try {
                     agg = unnestingFactory.createUnnestingEvaluator(ctx);
@@ -103,7 +103,6 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-                writer.open();
             }
 
             @Override
@@ -112,16 +111,12 @@
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
-
                     try {
                         offsetEval.evaluate(tRef, p);
                     } catch (AlgebricksException e) {
                         throw new HyracksDataException(e);
                     }
-
-                    @SuppressWarnings("static-access")
                     int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
-
                     try {
                         agg.init(tRef);
                         // assume that when unnesting the tuple, each step() call for each element
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index 260caa1..f6c3ad0b 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -33,22 +33,18 @@
  * A producer follows the following protocol when using an {@link IFrameWriter}.
  * Initially, the {@link IFrameWriter} is in the INITIAL state.
  * The first valid call to an {@link IFrameWriter} is always the {@link IFrameWriter#open()}. This call provides the opportunity for the {@link IFrameWriter} implementation to allocate any resources for its
- * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
- * state. If an error occurs
- * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException} is thrown and it stays in the INITIAL state.
- * While the {@link IFrameWriter} is in the OPENED state, the producer can call
- * one of:
+ * processing. Once open() is called, no matter successfully or not, the {@link IFrameWriter} is in the OPENED
+ * state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call one of:
  * <ul>
- * <li> {@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
- * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR state.</li>
- * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
+ * <li>{@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li>{@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, the {@link IFrameWriter} remains in the OPENED state.</li>
+ * <li>{@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
  * </ul>
  * In the FAILED state, the only call allowed is the {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
  * state and give up all resources.
  * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
- * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
- * return from the {@link IFrameWriter#open()} call must clean up all partially
- * allocated resources.
+ * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} must still be called by the producer.
  *
  * @author vinayakb
  */
@@ -61,7 +57,8 @@
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
      *
-     * @param buffer - Buffer containing data.
+     * @param buffer
+     *            - Buffer containing data.
      * @throws HyracksDataException
      */
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index 09dd03d..a46fa7b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -32,16 +32,16 @@
     private final ICounter closeCounter;
     private final ICounter frameCounter;
 
-    public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
-            ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+    public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer, ConnectorDescriptorId cdId,
+            int senderIndex, int receiverIndex) {
         this.writer = writer;
         int attempt = ctx.getTaskAttemptId().getAttempt();
-        this.openCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
-        this.closeCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
-        this.frameCounter = ctx.getCounterContext().getCounter(
-                cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+        this.openCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+        this.closeCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+        this.frameCounter = ctx.getCounterContext()
+                .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
     }
 
     @Override
@@ -58,8 +58,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        closeCounter.update(1);
-        writer.close();
+        try {
+            closeCounter.update(1);
+        } finally {
+            writer.close();
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 391a636..1ebebda 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -32,7 +32,6 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -70,10 +69,11 @@
         recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final HadoopHelper helper = new HadoopHelper(config);
         final Configuration conf = helper.getConfiguration();
         final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
@@ -219,18 +219,19 @@
                 for (int i = 0; i < comparatorFactories.length; ++i) {
                     comparators[i] = comparatorFactories[i].createBinaryComparator();
                 }
-                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(),
-                        runGen.getRuns(), new int[] { 0 }, comparators, null,
-                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
+                        new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
+                        framesLimit, delegatingWriter);
                 merger.process();
             }
         }
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @SuppressWarnings("unchecked")
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 try {
+                    writer.open();
                     SortingRecordWriter recordWriter = new SortingRecordWriter();
                     InputSplit split = null;
                     int blockId = 0;
@@ -246,9 +247,8 @@
                                 Thread.currentThread().setContextClassLoader(ctxCL);
                             }
                             recordWriter.initBlock(blockId);
-                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil()
-                                    .createMapContext(conf, taId, recordReader,
-                                            recordWriter, null, null, split);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
+                                    recordReader, recordWriter, null, null, split);
                             mapper.run(mCtx);
                             recordReader.close();
                             recordWriter.sortAndFlushBlock(writer);
@@ -259,6 +259,9 @@
                             throw new HyracksDataException(e);
                         }
                     }
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw th;
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 3e437e0..ee8c656 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -36,6 +36,7 @@
 public class LocalityAwarePartitionDataWriter implements IFrameWriter {
 
     private final IFrameWriter[] pWriters;
+    private final boolean[] isWriterOpen;
     private final IFrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
@@ -46,6 +47,7 @@
         int[] consumerPartitions = localityMap.getConsumers(senderIndex, nConsumerPartitions);
         pWriters = new IFrameWriter[consumerPartitions.length];
         appenders = new IFrameTupleAppender[consumerPartitions.length];
+        isWriterOpen = new boolean[consumerPartitions.length];
         for (int i = 0; i < consumerPartitions.length; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(consumerPartitions[i]);
@@ -67,6 +69,7 @@
     @Override
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
+            isWriterOpen[i] = true;
             pWriters[i].open();
         }
     }
@@ -94,8 +97,22 @@
      */
     @Override
     public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
-            pWriters[i].fail();
+            if (isWriterOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
         }
     }
 
@@ -106,10 +123,32 @@
      */
     @Override
     public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
-            appenders[i].flush(pWriters[i], true);
-            pWriters[i].close();
+            if (isWriterOpen[i]) {
+                try {
+                    appenders[i].flush(pWriters[i], true);
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                } finally {
+                    try {
+                        pWriters[i].close();
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }
-
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 1730b22..7a3a019 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -43,8 +43,9 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        final boolean[] isOpen = new boolean[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {
             epWriters[i] = edwFactory.createFrameWriter(i);
         }
@@ -62,21 +63,50 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                HyracksDataException failException = null;
                 for (int i = 0; i < epWriters.length; ++i) {
-                    epWriters[i].fail();
+                    if (isOpen[i]) {
+                        try {
+                            epWriters[i].fail();
+                        } catch (Throwable th) {
+                            if (failException == null) {
+                                failException = new HyracksDataException(th);
+                            } else {
+                                failException.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (failException != null) {
+                    throw failException;
                 }
             }
 
             @Override
             public void close() throws HyracksDataException {
+                HyracksDataException closeException = null;
                 for (int i = 0; i < epWriters.length; ++i) {
-                    epWriters[i].close();
+                    if (isOpen[i]) {
+                        try {
+                            epWriters[i].close();
+                        } catch (Throwable th) {
+                            if (closeException == null) {
+                                closeException = new HyracksDataException(th);
+                            } else {
+                                closeException.addSuppressed(th);
+                            }
+                        }
+                    }
+                }
+                if (closeException != null) {
+                    throw closeException;
                 }
             }
 
             @Override
             public void open() throws HyracksDataException {
                 for (int i = 0; i < epWriters.length; ++i) {
+                    isOpen[i] = true;
                     epWriters[i].open();
                 }
             }
@@ -84,8 +114,8 @@
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 08df2c5..336272c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -35,6 +35,7 @@
 public class PartitionDataWriter implements IFrameWriter {
     private final int consumerPartitionCount;
     private final IFrameWriter[] pWriters;
+    private final boolean[] isOpen;
     private final FrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
@@ -45,6 +46,7 @@
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
+        isOpen = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
@@ -61,17 +63,40 @@
 
     @Override
     public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
-            if (allocatedFrame) {
-                appenders[i].flush(pWriters[i], true);
+            if (isOpen[i]) {
+                if (allocatedFrame) {
+                    try {
+                        appenders[i].flush(pWriters[i], true);
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+                try {
+                    pWriters[i].close();
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                }
             }
-            pWriters[i].close();
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }
 
     @Override
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
+            isOpen[i] = true;
             pWriters[i].open();
         }
     }
@@ -99,8 +124,22 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
-            pWriters[i].fail();
+            if (isOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
         }
     }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index f94d985..0f3687e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -56,8 +56,8 @@
             @Override
             public void initialize() throws HyracksDataException {
                 File f = split.getLocalFile().getFile();
-                writer.open();
                 try {
+                    writer.open();
                     InputStream in;
                     try {
                         in = new FileInputStream(f);
@@ -66,9 +66,9 @@
                         throw new HyracksDataException(e);
                     }
                     tp.parse(in, writer);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 814e537..779c631 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -122,8 +122,8 @@
     public void initialize() throws HyracksDataException {
         aggState = (ExternalGroupState) ctx.getStateObject(stateId);
         runs = aggState.getRuns();
-        writer.open();
         try {
+            writer.open();
             if (runs.size() <= 0) {
                 ISpillableTable gTable = aggState.getSpillableTable();
                 if (gTable != null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7fa9fcc..1c08b53 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,8 +29,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -65,8 +65,8 @@
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
-                groupFields, writer);
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
+                writer);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -138,9 +138,9 @@
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        boolean hasOutput = outputPartial ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor,
-                lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
-                lastTupleIndex, aggregateState);
+        boolean hasOutput = outputPartial
+                ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState)
+                : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
         if (hasOutput) {
             appenderWrapper.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
@@ -172,13 +172,16 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (!isFailed && !first) {
-            assert(copyFrameAccessor.getTupleCount() > 0);
-            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-            appenderWrapper.flush();
+        try {
+            if (!isFailed && !first) {
+                assert (copyFrameAccessor.getTupleCount() > 0);
+                writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+                appenderWrapper.flush();
+            }
+            aggregator.close();
+            aggregateState.close();
+        } finally {
+            appenderWrapper.close();
         }
-        aggregator.close();
-        aggregateState.close();
-        appenderWrapper.close();
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 349cc5a..8ba7626 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -104,11 +104,8 @@
                 nullWriters1[i] = nullWriterFactories[i].createNullWriter();
             }
         }
-
-        writer.open();// open for probe
-
         try {
-
+            writer.open();// open for probe
             IFrame buffer = new VSizeFrame(ctx);
             // buffer
             int tableSize = (int) (numPartitions * recordsPerFrame * factor);
@@ -148,9 +145,9 @@
                 probeReader.close();
                 joiner.closeJoin(writer);
             }
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index f72d528..7badc1e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -188,7 +188,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -201,12 +201,12 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
-                        .getJobId(), new TaskId(getActivityId(), partition));
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
                 private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
                 private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories).createPartitioner();
@@ -302,8 +302,8 @@
                         if (memsize > inputsize0) {
                             state.nPartitions = 0;
                         } else {
-                            state.nPartitions = (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize)
-                                    / (memsize - 1)));
+                            state.nPartitions = (int) (Math
+                                    .ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
                         }
                         if (state.nPartitions <= 0) {
                             // becomes in-memory HJ
@@ -352,8 +352,8 @@
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
                     RunFileWriter writer = state.fWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                                BuildAndPartitionActivityNode.class.getSimpleName());
+                        FileReference file = ctx.getJobletContext()
+                                .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
                         state.fWriters[i] = writer;
@@ -378,7 +378,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -391,8 +391,8 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
@@ -413,9 +413,9 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     writer.open();
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     buildWriters = state.fWriters;
                     probeWriters = new RunFileWriter[state.nPartitions];
                     bufferForPartitions = new IFrame[state.nPartitions];
@@ -483,65 +483,69 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.join(inBuffer.getBuffer(), writer);
-                    state.joiner.closeJoin(writer);
-                    ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
-                            .createPartitioner();
-                    ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
-                            .createPartitioner();
-                    if (state.memoryForHashtable != memsize - 2) {
-                        for (int i = 0; i < state.nPartitions; i++) {
-                            ByteBuffer buf = bufferForPartitions[i].getBuffer();
-                            accessorProbe.reset(buf);
-                            if (accessorProbe.getTupleCount() > 0) {
-                                write(i, buf);
+                    try {
+                        state.joiner.join(inBuffer.getBuffer(), writer);
+                        state.joiner.closeJoin(writer);
+                        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
+                                .createPartitioner();
+                        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
+                                .createPartitioner();
+                        if (state.memoryForHashtable != memsize - 2) {
+                            for (int i = 0; i < state.nPartitions; i++) {
+                                ByteBuffer buf = bufferForPartitions[i].getBuffer();
+                                accessorProbe.reset(buf);
+                                if (accessorProbe.getTupleCount() > 0) {
+                                    write(i, buf);
+                                }
+                                closeWriter(i);
                             }
-                            closeWriter(i);
-                        }
 
-                        inBuffer.reset();
-                        int tableSize = -1;
-                        if (state.memoryForHashtable == 0) {
-                            tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
-                        } else {
-                            tableSize = (int) (memsize * recordsPerFrame * factor);
-                        }
-                        ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                        for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
-                            RunFileWriter buildWriter = buildWriters[partitionid];
-                            RunFileWriter probeWriter = probeWriters[partitionid];
-                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                                continue;
+                            inBuffer.reset();
+                            int tableSize = -1;
+                            if (state.memoryForHashtable == 0) {
+                                tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
+                            } else {
+                                tableSize = (int) (memsize * recordsPerFrame * factor);
                             }
-                            table.reset();
-                            InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0),
-                                    hpcRep0, new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0,
-                                            keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                            for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
+                                RunFileWriter buildWriter = buildWriters[partitionid];
+                                RunFileWriter probeWriter = probeWriters[partitionid];
+                                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+                                    continue;
+                                }
+                                table.reset();
+                                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
+                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+                                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                        nullWriters1, table, predEvaluator);
 
-                            if (buildWriter != null) {
-                                RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                buildReader.open();
-                                while (buildReader.nextFrame(inBuffer)) {
-                                    ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                    FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                    joiner.build(copyBuffer);
+                                if (buildWriter != null) {
+                                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
+                                    buildReader.open();
+                                    while (buildReader.nextFrame(inBuffer)) {
+                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+                                        joiner.build(copyBuffer);
+                                        inBuffer.reset();
+                                    }
+                                    buildReader.close();
+                                }
+
+                                // probe
+                                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
+                                probeReader.open();
+                                while (probeReader.nextFrame(inBuffer)) {
+                                    joiner.join(inBuffer.getBuffer(), writer);
                                     inBuffer.reset();
                                 }
-                                buildReader.close();
+                                probeReader.close();
+                                joiner.closeJoin(writer);
                             }
-
-                            // probe
-                            RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                            probeReader.open();
-                            while (probeReader.nextFrame(inBuffer)) {
-                                joiner.join(inBuffer.getBuffer(), writer);
-                                inBuffer.reset();
-                            }
-                            probeReader.close();
-                            joiner.closeJoin(writer);
                         }
+                    } finally {
+                        writer.close();
                     }
-                    writer.close();
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
@@ -554,8 +558,8 @@
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
                     RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
-                                .getSimpleName());
+                        FileReference file = ctx
+                                .createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
                         probeWriters[i] = writer;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index dae441b..87ac9bd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -82,8 +82,7 @@
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            INullWriterFactory[] nullWriterFactories1,
-            int tableSize) {
+            INullWriterFactory[] nullWriterFactories1, int tableSize) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -174,9 +173,8 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ?
-                    null :
-                    predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
@@ -187,13 +185,12 @@
                             .createPartitioner();
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
-                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize,
-                            new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
-                            new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
+                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
+                            isLeftOuter, nullWriters1, table, predEvaluator);
                 }
 
                 @Override
@@ -231,9 +228,9 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0),
-                            partition));
                     writer.open();
+                    state = (HashBuildTaskState) ctx
+                            .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
                 }
 
                 @Override
@@ -243,8 +240,11 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.closeJoin(writer);
-                    writer.close();
+                    try {
+                        state.joiner.closeJoin(writer);
+                    } finally {
+                        writer.close();
+                    }
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 03570c7..a12450e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -132,9 +132,8 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) ?
-                    predEvaluatorFactory.createPredicateEvaluator() :
-                    null);
+            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
+                    ? predEvaluatorFactory.createPredicateEvaluator() : null);
 
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
@@ -148,12 +147,11 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
 
-                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0),
-                            new FrameTupleAccessor(rd1), comparator, memSize, predEvaluator, isLeftOuter,
-                            nullWriters1);
+                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
+                            comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
 
                 }
 
@@ -194,9 +192,9 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (JoinCacheTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            JOIN_CACHE_ACTIVITY_ID), partition));
                     writer.open();
+                    state = (JoinCacheTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
                 }
 
                 @Override
@@ -206,8 +204,11 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.joiner.closeJoin(writer);
-                    writer.close();
+                    try {
+                        state.joiner.closeJoin(writer);
+                    } finally {
+                        writer.close();
+                    }
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 0278f92..c0c467a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -43,7 +43,6 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -52,7 +51,6 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFamily;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -160,7 +158,7 @@
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory0,
             ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         super(spec, 2, 1);
         this.memsize = memsize;
@@ -207,8 +205,7 @@
         if (memorySize > buildSize) {
             return 1; //We will switch to in-Mem HJ eventually
         }
-        numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize)
-                / (double) (memorySize - 1)));
+        numberOfPartitions = (int) (Math.ceil((buildSize * factor / nPartitions - memorySize) / (memorySize - 1)));
         if (numberOfPartitions <= 0) {
             numberOfPartitions = 1; //becomes in-memory hash join
         }
@@ -273,12 +270,12 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
-                        .getJobId(), new TaskId(getActivityId(), partition));
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
 
                 ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                         hashFunctionGeneratorFactories).createPartitioner(0);
@@ -351,15 +348,15 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
 
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
-                    .createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+                    : predEvaluatorFactory.createPredicateEvaluator());
 
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -378,12 +375,11 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
                     writer.open();
-                    state.hybridHJ.initProbe();
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
+                    state.hybridHJ.initProbe();
                     LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
                 }
 
@@ -399,45 +395,40 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.hybridHJ.closeProbe(writer);
-
-                    BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
-                    rPartbuff.reset();
-                    for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
-
-                        RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
-                        RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-
-                        if (bReader == null || pReader
-                                == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
-                            continue;
+                    try {
+                        state.hybridHJ.closeProbe(writer);
+                        BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+                        rPartbuff.reset();
+                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
+                                .nextSetBit(pid + 1)) {
+                            RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
+                            RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+                            if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+                                continue;
+                            }
+                            int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
+                            int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+                            int beforeMax = (bSize > pSize) ? bSize : pSize;
+                            joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
                         }
-                        int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
-                        int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
-                        int beforeMax = (bSize > pSize) ? bSize : pSize;
-                        joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
+                    } finally {
+                        writer.close();
                     }
-                    writer.close();
                     LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                 }
 
                 private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
                         RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
-                        throws HyracksDataException {
+                                throws HyracksDataException {
                     ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = wasReversed ?
-                            (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize()) :
-                            (ohhj
-                                    .getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
-                    long probePartSize = wasReversed ?
-                            (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize()) :
-                            (ohhj
-                                    .getProbePartitionSize(pid) / ctx.getInitialFrameSize());
+                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize())
+                            : (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
+                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize())
+                            : (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize());
 
                     LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
                             + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
@@ -448,12 +439,11 @@
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
                             || (probePartSize < state.memForJoin && !isLeftOuter)) {
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize
-                                < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
                             LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                     + level + "]");
-                            tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj
-                                    .getBuildPartitionSizeInTup(pid);
+                            tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid)
+                                    : ohhj.getBuildPartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -465,8 +455,8 @@
                             LOGGER.fine(
                                     "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
                                             + level + "]");
-                            tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj
-                                    .getProbePartitionSizeInTup(pid);
+                            tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid)
+                                    : ohhj.getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
                                 throw new HyracksDataException(
                                         "Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -480,8 +470,7 @@
                     else {
                         LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         OptimizedHybridHashJoin rHHj;
-                        if (!forceRR && (isLeftOuter
-                                || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
                             LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                     + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
@@ -513,13 +502,12 @@
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitionStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
-                                    * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -527,16 +515,15 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1),
-                                            false); //checked-confirmed
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
                                 }
 
                             } else { //Case 2.1.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -585,12 +572,11 @@
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitionStatus();
 
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
-                                    * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
                                         + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -598,15 +584,14 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1),
-                                            true); //checked-confirmed
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
                                 }
                             } else { //Case 2.2.2 - Switch to NLJ
                                 LOGGER.fine(
                                         "\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
                                                 + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0);
-                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+                                        .nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -644,8 +629,7 @@
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        ByteBuffer copyBuffer = ctx
-                                .allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -665,10 +649,9 @@
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
                         boolean reverse) throws HyracksDataException {
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx,
-                            new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), nljComparator, memorySize,
-                            predEvaluator, isLeftOuter, nullWriters1);
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter,
+                            nullWriters1);
                     nlj.setIsReversed(reverse);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 2398372..0c647d7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -49,9 +49,9 @@
         writer.open();
         try {
             appender.flush(writer, false);
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index b9c2fb1..f8a8a67 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -52,8 +52,8 @@
     }
 
     public void open(IHyracksTaskContext ctx) throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                MaterializerTaskState.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(MaterializerTaskState.class.getSimpleName());
         out = new RunFileWriter(file, ctx.getIOManager());
         out.open();
     }
@@ -68,16 +68,19 @@
 
     public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
         RunFileReader in = out.createDeleteOnCloseReader();
-        writer.open();
         try {
-            in.open();
-            while (in.nextFrame(frame)) {
-                writer.nextFrame(frame.getBuffer());
+            writer.open();
+            try {
+                in.open();
+                while (in.nextFrame(frame)) {
+                    writer.nextFrame(frame.getBuffer());
+                }
+            } finally {
+                in.close();
             }
-            in.close();
-        } catch (Exception e) {
+        } catch (Throwable th) {
             writer.fail();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         } finally {
             writer.close();
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 04b893a..feff13c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(new ActivityId(odId,
-                SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         int taskOutputIndex = 0;
@@ -88,8 +88,8 @@
             int activityId = MATERIALIZE_READER_ACTIVITY_ID;
             for (int i = 0; i < outputArity; i++) {
                 if (outputMaterializationFlags[i]) {
-                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(new ActivityId(odId,
-                            activityId));
+                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+                            new ActivityId(odId, activityId));
                     builder.addActivity(this, mra);
                     builder.addTargetEdge(i, mra, 0);
                     builder.addBlockingEdge(sma, mra);
@@ -113,15 +113,17 @@
             return new AbstractUnaryInputOperatorNodePushable() {
                 private MaterializerTaskState state;
                 private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+                private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
 
                 @Override
                 public void open() throws HyracksDataException {
                     if (requiresMaterialization) {
-                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(
-                                getActivityId(), partition));
+                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                                new TaskId(getActivityId(), partition));
                         state.open(ctx);
                     }
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        isOpen[i] = true;
                         writers[i].open();
                     }
                 }
@@ -138,19 +140,50 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    if (requiresMaterialization) {
-                        state.close();
-                        ctx.setStateObject(state);
+                    HyracksDataException hde = null;
+                    try {
+                        if (requiresMaterialization) {
+                            state.close();
+                            ctx.setStateObject(state);
+                        }
+                    } finally {
+                        for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                            if (isOpen[i]) {
+                                try {
+                                    writers[i].close();
+                                } catch (Throwable th) {
+                                    if (hde == null) {
+                                        hde = new HyracksDataException(th);
+                                    } else {
+                                        hde.addSuppressed(th);
+                                    }
+                                }
+                            }
+                        }
                     }
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        writers[i].close();
+                    if (hde != null) {
+                        throw hde;
                     }
                 }
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    HyracksDataException hde = null;
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        writers[i].fail();
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
                     }
                 }
 
@@ -172,21 +205,21 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
                     numberOfActiveMaterializeReaders--;
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
                     if (numberOfActiveMaterializeReaders == 0) {
                         state.deleteFile();
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 8d54c39..74f1cb4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -87,9 +87,6 @@
     private static class SortTaskState extends AbstractStateObject {
         private FrameSorterMergeSort frameSorter;
 
-        public SortTaskState() {
-        }
-
         private SortTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
@@ -165,14 +162,14 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    writer.open();
                     try {
-                        SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                                getOperatorId(), SORT_ACTIVITY_ID), partition));
+                        writer.open();
+                        SortTaskState state = (SortTaskState) ctx.getStateObject(
+                                new TaskId(new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
                         state.frameSorter.flush(writer);
-                    } catch (Exception e) {
+                    } catch (Throwable th) {
                         writer.fail();
-                        throw new HyracksDataException(e);
+                        throw new HyracksDataException(th);
                     } finally {
                         writer.close();
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 3457fe8..289b879 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -34,7 +34,8 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
 
 public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
-    public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, RecordDescriptor recordDescriptor) {
+    public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs,
+            RecordDescriptor recordDescriptor) {
         super(spec, nInputs, 1);
         recordDescriptors[0] = recordDescriptor;
     }
@@ -61,7 +62,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-                throws HyracksDataException {
+                        throws HyracksDataException {
             RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             return new UnionOperator(ctx, inRecordDesc);
         }
@@ -69,9 +70,7 @@
 
     private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
         private int nOpened;
-
         private int nClosed;
-
         private boolean failed;
 
         public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
@@ -106,7 +105,7 @@
                 @Override
                 public void fail() throws HyracksDataException {
                     synchronized (UnionOperator.this) {
-                        if (failed) {
+                        if (!failed) {
                             writer.fail();
                         }
                         failed = true;
@@ -117,6 +116,7 @@
                 public void close() throws HyracksDataException {
                     synchronized (UnionOperator.this) {
                         if (++nClosed == inputArity) {
+                            // a single close
                             writer.close();
                         }
                     }
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index b0b1d52..431a4f4 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -79,8 +79,8 @@
 
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 try {
+                    writer.open();
                     for (int i = 0; i < numRecords; i++) {
                         tb.reset();
                         for (int j = 0; j < recDesc.getFieldCount(); j++) {
@@ -90,14 +90,15 @@
                         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                             appender.flush(writer, true);
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+                                throw new HyracksDataException("Record size (" + tb.getSize()
+                                        + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
                             }
                         }
                     }
                     appender.flush(writer, true);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index ed7c08d..5d300f4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,11 @@
             public void initialize() throws HyracksDataException {
                 try {
                     writer.open();
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
+                } finally {
                     writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
                 }
             }
         };
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 0d06655..26292f8 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -26,7 +26,6 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -44,7 +43,7 @@
  * To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
  * key-value pairs into tuples.
  */
-@SuppressWarnings({ "deprecation", "rawtypes" })
+@SuppressWarnings({ "rawtypes" })
 public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
@@ -91,7 +90,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final InputSplit[] inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -102,46 +101,50 @@
             public void initialize() throws HyracksDataException {
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    writer.open();
                     Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
                     JobConf conf = confFactory.getConf();
                     conf.setClassLoader(ctx.getJobletContext().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
-                    writer.open();
-                    parser.open(writer);
-                    InputFormat inputFormat = conf.getInputFormat();
-                    for (int i = 0; i < inputSplits.length; i++) {
-                        /**
-                         * read all the partitions scheduled to the current node
-                         */
-                        if (scheduledLocations[i].equals(nodeName)) {
+                    try {
+                        parser.open(writer);
+                        InputFormat inputFormat = conf.getInputFormat();
+                        for (int i = 0; i < inputSplits.length; i++) {
                             /**
-                             * pick an unread split to read
-                             * synchronize among simultaneous partitions in the same machine
+                             * read all the partitions scheduled to the current node
                              */
-                            synchronized (executed) {
-                                if (executed[i] == false) {
-                                    executed[i] = true;
-                                } else {
-                                    continue;
+                            if (scheduledLocations[i].equals(nodeName)) {
+                                /**
+                                 * pick an unread split to read
+                                 * synchronize among simultaneous partitions in the same machine
+                                 */
+                                synchronized (executed) {
+                                    if (executed[i] == false) {
+                                        executed[i] = true;
+                                    } else {
+                                        continue;
+                                    }
+                                }
+
+                                /**
+                                 * read the split
+                                 */
+                                RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
+                                Object key = reader.createKey();
+                                Object value = reader.createValue();
+                                while (reader.next(key, value) == true) {
+                                    parser.parse(key, value, writer, inputSplits[i].toString());
                                 }
                             }
-
-                            /**
-                             * read the split
-                             */
-                            RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
-                            Object key = reader.createKey();
-                            Object value = reader.createValue();
-                            while (reader.next(key, value) == true) {
-                                parser.parse(key, value, writer, inputSplits[i].toString());
-                            }
                         }
+                    } finally {
+                        parser.close(writer);
                     }
-                    parser.close(writer);
-                    writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
+                    writer.close();
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 28c6cfe..d69191d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -30,7 +30,6 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -104,7 +103,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final List<FileSplit> inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -116,11 +115,11 @@
             public void initialize() throws HyracksDataException {
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
+                    writer.open();
                     Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
                     Job job = confFactory.getConf();
                     job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
-                    writer.open();
                     InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
                             job.getConfiguration());
                     int size = inputSplits.size();
@@ -155,10 +154,11 @@
                         }
                     }
                     parser.close(writer);
-                    writer.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
+                    writer.close();
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index cf14c3b..fba9d2c 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -16,57 +16,68 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-storage-am-btree</artifactId>
-  <name>hyracks-storage-am-btree</name>
-
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.17-SNAPSHOT</version>
-  </parent>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-
-
-  <dependencies>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hyracks-storage-am-btree</artifactId>
+    <name>hyracks-storage-am-btree</name>
+    <parent>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks</artifactId>
+        <version>0.2.17-SNAPSHOT</version>
+    </parent>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
+    <dependencies>
         <dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-storage-am-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
-  </dependencies>
-</project>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-storage-am-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>2.0.2-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
new file mode 100644
index 0000000..b5c14c1
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ BTreeUtils.class, FrameTupleAccessor.class, ArrayTupleBuilder.class,
+        IndexSearchOperatorNodePushable.class, FrameUtils.class, FrameTupleAppender.class })
+public class FramewriterTest {
+
+    private CountAnswer openException = new CountAndThrowException("Exception in open()");
+    private CountAnswer nextFrameException = new CountAndThrowException("Exception in nextFrame()");
+    private CountAnswer failException = new CountAndThrowException("Exception in fail()");
+    private CountAnswer closeException = new CountAndThrowException("Exception in close()");
+    private CountAnswer openError = new CountAndThrowError("Exception in open()");
+    private CountAnswer nextFrameError = new CountAndThrowError("Exception in nextFrame()");
+    private CountAnswer failError = new CountAndThrowError("Exception in fail()");
+    private CountAnswer closeError = new CountAndThrowError("Exception in close()");
+    private CountAnswer openNormal = new CountAnswer();
+    private CountAnswer nextFrameNormal = new CountAnswer();
+    private CountAnswer failNormal = new CountAnswer();
+    private CountAnswer closeNormal = new CountAnswer();
+
+    private int successes = 0;
+    private int failures = 0;
+    private static final int BUFFER_SIZE = 32000;
+    private static final int RECORDS_PER_FRAME = 3;
+    public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(BUFFER_SIZE);
+    private static final int NUMBER_OF_APPENDERS = 2;
+    public int counter = 0;
+
+    public boolean validate(boolean finished) {
+        // get number of open calls
+        int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
+        int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
+                + nextFrameError.getCallCount();
+        int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
+        int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
+
+        if (failCount > 1 || closeCount > 1 || openCount > 1) {
+            failures++;
+            return false;
+        }
+        if (openCount == 0 && (nextFrameCount > 0 || failCount > 0 || closeCount > 0)) {
+            failures++;
+            return false;
+        }
+        if (finished) {
+            if (closeCount == 0 && (nextFrameCount > 0 || failCount > 0 || openCount > 0)) {
+                failures++;
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    public MultiComparator mockMultiComparator() {
+        MultiComparator mc = Mockito.mock(MultiComparator.class);
+        return mc;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // Mock static methods
+        PowerMockito.mockStatic(BTreeUtils.class);
+        PowerMockito.when(BTreeUtils.getSearchMultiComparator(Matchers.any(), Matchers.any()))
+                .thenReturn(mockMultiComparator());
+        PowerMockito.mockStatic(FrameUtils.class);
+
+        // Custom implementation for FrameUtils that push to next frame immediately
+        PowerMockito.when(
+                FrameUtils.appendToWriter(Matchers.any(IFrameWriter.class), Matchers.any(IFrameTupleAppender.class),
+                        Matchers.any(IFrameTupleAccessor.class), Matchers.anyInt(), Matchers.anyInt()))
+                .thenAnswer(new Answer<Integer>() {
+                    @Override
+                    public Integer answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        IFrameWriter writer = (IFrameWriter) args[0];
+                        writer.nextFrame(EMPTY_BUFFER);
+                        return BUFFER_SIZE;
+                    }
+                });
+
+        // create global mock for FrameTupleAccessor, ArrayTupleBuilder
+        FrameTupleAccessor frameAccessor = Mockito.mock(FrameTupleAccessor.class);
+        Mockito.when(frameAccessor.getTupleCount()).thenReturn(RECORDS_PER_FRAME);
+
+        // Global custom implementations for FrameTupleAppender
+        // since we have two appenders, then we need to test each test twice
+        FrameTupleAppender[] appenders = mockAppenders();
+
+        // Mock all instances of a class <Note that you need to prepare the class calling this constructor as well>
+        PowerMockito.whenNew(FrameTupleAccessor.class).withAnyArguments().thenReturn(frameAccessor);
+        PowerMockito.whenNew(FrameTupleAppender.class).withAnyArguments().thenAnswer(new Answer<FrameTupleAppender>() {
+            @Override
+            public FrameTupleAppender answer(InvocationOnMock invocation) throws Throwable {
+                counter++;
+                if (counter % 2 == 1) {
+                    return appenders[0];
+                }
+                return appenders[1];
+            }
+        });
+    }
+
+    public static FrameTupleAppender[] mockAppenders() throws HyracksDataException {
+        FrameTupleAppender[] appenders = new FrameTupleAppender[2];
+        appenders[0] = Mockito.mock(FrameTupleAppender.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Object[] args = invocation.getArguments();
+                IFrameWriter writer = (IFrameWriter) args[0];
+                writer.nextFrame(EMPTY_BUFFER);
+                return null;
+            }
+        }).when(appenders[0]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+        appenders[1] = Mockito.mock(FrameTupleAppender.class);
+        Mockito.doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                throw new HyracksDataException("couldn't flush frame");
+            }
+        }).when(appenders[1]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+        return appenders;
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    private void resetAllCounters() {
+        openException.reset();
+        nextFrameException.reset();
+        failException.reset();
+        closeException.reset();
+        openNormal.reset();
+        nextFrameNormal.reset();
+        failNormal.reset();
+        closeNormal.reset();
+        openError.reset();
+        nextFrameError.reset();
+        failError.reset();
+        closeError.reset();
+    }
+
+    @Test
+    public void test() {
+        try {
+            testBTreeSearchOperatorNodePushable();
+        } catch (Throwable th) {
+            th.printStackTrace();
+        }
+        System.out.println("Number of passed tests: " + successes);
+        System.out.println("Number of failed tests: " + failures);
+        Assert.assertEquals(failures, 0);
+    }
+
+    private void testBTreeSearchOperatorNodePushable() throws Exception {
+        /*
+         * coverage
+         * in open(){
+         *  writer.open() succeeds vs. throws exception vs. throws error
+         *   indexHelper.open() succeeds vs. throws exception
+         *    createAccessor() succeeds vs. throws exception
+         * }
+         * in nextFrame(){
+         *  indexAccessor.search succeeds vs. throws exception
+         *   writeSearchResults succeeds vs. throws exception vs. throws error
+         * }
+         * in fail(){
+         *  writer.fail() succeeds, throws exception, or throws error
+         * }
+         * in close(){
+         *  appender.close() succeeds, throws exception, or throws error
+         * }
+         */
+        int i = 0;
+        counter = 0;
+        while (i < NUMBER_OF_APPENDERS) {
+            i++;
+            ByteBuffer buffer = mockByteBuffer();
+            IFrameWriter[] outPutFrameWriters = createOutputWriters();
+            for (IFrameWriter outputWriter : outPutFrameWriters) {
+                IFrameWriter[] underTest = createWriters();
+                for (IFrameWriter writer : underTest) {
+                    ((AbstractUnaryOutputOperatorNodePushable) writer).setOutputFrameWriter(0, outputWriter,
+                            mockRecordDescriptor());
+                    testWriter(writer, buffer);
+                }
+            }
+            counter = i;
+        }
+    }
+
+    private ByteBuffer mockByteBuffer() {
+        return ByteBuffer.allocate(BUFFER_SIZE);
+    }
+
+    /**
+     * @return a list of writers to test. these writers can be of the same type but behave differently based on included mocks
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    public IFrameWriter[] createWriters() throws HyracksDataException, IndexException {
+        ArrayList<BTreeSearchOperatorNodePushable> writers = new ArrayList<BTreeSearchOperatorNodePushable>();
+        AbstractTreeIndexOperatorDescriptor[] opDescs = mockIndexOpDesc();
+        IRecordDescriptorProvider[] recordDescProviders = mockRecDescProviders();
+        int partition = 0;
+        IHyracksTaskContext[] ctxs = mockIHyracksTaskContext();
+        int[] keys = { 0 };
+        boolean lowKeyInclusive = true;
+        boolean highKeyInclusive = true;
+        for (AbstractTreeIndexOperatorDescriptor opDesc : opDescs) {
+            for (IRecordDescriptorProvider recordDescProvider : recordDescProviders) {
+                for (IHyracksTaskContext ctx : ctxs) {
+                    BTreeSearchOperatorNodePushable writer = new BTreeSearchOperatorNodePushable(opDesc, ctx, partition,
+                            recordDescProvider, keys, keys, lowKeyInclusive, highKeyInclusive, keys, keys);
+                    writers.add(writer);
+                }
+            }
+        }
+        // Create the framewriter using the mocks
+        return writers.toArray(new IFrameWriter[writers.size()]);
+    }
+
+    private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException {
+        IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+        Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
+        Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
+        Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
+        Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()))
+                .thenReturn(mockByteBuffer());
+        return new IHyracksTaskContext[] { ctx };
+    }
+
+    private IRecordDescriptorProvider[] mockRecDescProviders() {
+        RecordDescriptor rDesc = mockRecordDescriptor();
+        IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+        Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+        Mockito.when(rDescProvider.getOutputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+        return new IRecordDescriptorProvider[] { rDescProvider };
+    }
+
+    @SuppressWarnings("rawtypes")
+    private RecordDescriptor mockRecordDescriptor() {
+        ISerializerDeserializer serde = Mockito.mock(ISerializerDeserializer.class);
+        RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[] { serde });
+        return rDesc;
+    }
+
+    public ITreeIndex[] mockIndexes() throws HyracksDataException, IndexException {
+        IIndexAccessor[] indexAccessors = mockIndexAccessors();
+        ITreeIndex[] indexes = new ITreeIndex[indexAccessors.length * 2];
+        int j = 0;
+        for (int i = 0; i < indexAccessors.length; i++) {
+            indexes[j] = Mockito.mock(ITreeIndex.class);
+            Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any())).thenReturn(indexAccessors[i]);
+            j++;
+            indexes[j] = Mockito.mock(ITreeIndex.class);
+            Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any()))
+                    .thenThrow(new HyracksDataException("failed to create accessor"));
+            j++;
+        }
+        return indexes;
+    }
+
+    private IIndexAccessor[] mockIndexAccessors() throws HyracksDataException, IndexException {
+        IIndexCursor[] cursors = mockIndexCursors();
+        IIndexAccessor[] accessors = new IIndexAccessor[cursors.length * 2];
+        int j = 0;
+        for (int i = 0; i < cursors.length; i++) {
+            IIndexCursor cursor = cursors[i];
+            IIndexAccessor accessor = Mockito.mock(IIndexAccessor.class);
+            Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+            accessors[j] = accessor;
+            j++;
+            accessor = Mockito.mock(IIndexAccessor.class);
+            Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+            Mockito.doAnswer(new Answer<Object>() {
+                private int k = 0;
+
+                @Override
+                public Object answer(InvocationOnMock invocation) throws Throwable {
+                    k++;
+                    if (k % 2 == 0) {
+                        throw new HyracksDataException("Couldn't search index");
+                    }
+                    return null;
+                }
+            }).when(accessor).search(Matchers.any(), Matchers.any());
+            accessors[j] = accessor;
+            j++;
+        }
+
+        return accessors;
+    }
+
+    private IIndexCursor[] mockIndexCursors() throws HyracksDataException, IndexException {
+        ITupleReference[] tuples = mockTuples();
+        IIndexCursor[] cursors = new IIndexCursor[tuples.length * 2];
+        int j = 0;
+        for (int i = 0; i < tuples.length; i++) {
+            IIndexCursor cursor = Mockito.mock(IIndexCursor.class);
+            Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+            Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+            cursors[j] = cursor;
+            j++;
+            cursor = Mockito.mock(IIndexCursor.class);
+            Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+            Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+            Mockito.doThrow(new HyracksDataException("Failed to close cursor")).when(cursor).close();
+            cursors[j] = cursor;
+            j++;
+        }
+        return cursors;
+    }
+
+    private ITupleReference[] mockTuples() {
+        ITupleReference tuple = Mockito.mock(ITupleReference.class);
+        return new ITupleReference[] { tuple };
+    }
+
+    public IIndexDataflowHelper[] mockIndexHelpers() throws HyracksDataException, IndexException {
+        ITreeIndex[] indexes = mockIndexes();
+        IIndexDataflowHelper[] indexHelpers = new IIndexDataflowHelper[indexes.length * 2];
+        int j = 0;
+        for (int i = 0; i < indexes.length; i++) {
+            // normal
+            indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+            Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(indexes[i]);
+
+            // throws exception when opened
+            j++;
+            indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+            Mockito.doThrow(new HyracksDataException("couldn't open index")).when(indexHelpers[j]).open();
+            Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(null);
+
+            j++;
+        }
+        return indexHelpers;
+    }
+
+    public IIndexDataflowHelperFactory[] mockIndexHelperFactories() throws HyracksDataException, IndexException {
+        IIndexDataflowHelper[] helpers = mockIndexHelpers();
+        IIndexDataflowHelperFactory[] indexHelperFactories = new IIndexDataflowHelperFactory[helpers.length];
+        for (int i = 0; i < helpers.length; i++) {
+            indexHelperFactories[i] = Mockito.mock(IIndexDataflowHelperFactory.class);
+            Mockito.when(
+                    indexHelperFactories[i].createIndexDataflowHelper(Mockito.any(), Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(helpers[i]);
+        }
+        return indexHelperFactories;
+    }
+
+    public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
+        IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
+        ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
+        AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+                * searchOpCallbackFactories.length];
+        int k = 0;
+        for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
+            for (int j = 0; j < searchOpCallbackFactories.length; j++) {
+                AbstractTreeIndexOperatorDescriptor opDesc = Mockito.mock(AbstractTreeIndexOperatorDescriptor.class);
+                Mockito.when(opDesc.getIndexDataflowHelperFactory()).thenReturn(indexDataflowHelperFactories[i]);
+                Mockito.when(opDesc.getRetainInput()).thenReturn(false);
+                Mockito.when(opDesc.getRetainNull()).thenReturn(false);
+                Mockito.when(opDesc.getSearchOpCallbackFactory()).thenReturn(searchOpCallbackFactories[j]);
+                opDescs[k] = opDesc;
+                k++;
+            }
+        }
+        return opDescs;
+    }
+
+    private ISearchOperationCallbackFactory[] mockSearchOpCallbackFactories() throws HyracksDataException {
+        ISearchOperationCallback searchOpCallback = mockSearchOpCallback();
+        ISearchOperationCallbackFactory searchOpCallbackFactory = Mockito.mock(ISearchOperationCallbackFactory.class);
+        Mockito.when(searchOpCallbackFactory.createSearchOperationCallback(Mockito.anyLong(), Mockito.any()))
+                .thenReturn(searchOpCallback);
+        return new ISearchOperationCallbackFactory[] { searchOpCallbackFactory };
+    }
+
+    private ISearchOperationCallback mockSearchOpCallback() {
+        ISearchOperationCallback opCallback = Mockito.mock(ISearchOperationCallback.class);
+        return opCallback;
+    }
+
+    public class CountAnswer implements Answer<Object> {
+        protected int count = 0;
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            return null;
+        }
+
+        public int getCallCount() {
+            return count;
+        }
+
+        public void reset() {
+            count = 0;
+        }
+    }
+
+    public class CountAndThrowException extends CountAnswer {
+        private String errorMessage;
+
+        public CountAndThrowException(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            throw new HyracksDataException(errorMessage);
+        }
+    }
+
+    public class CountAndThrowError extends CountAnswer {
+        private String errorMessage;
+
+        public CountAndThrowError(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            count++;
+            throw new UnknownError(errorMessage);
+        }
+    }
+
+    public IFrameWriter[] createOutputWriters() throws Exception {
+        CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
+        CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };
+        CountAnswer[] fails = new CountAnswer[] { failNormal, failException, failError };
+        CountAnswer[] closes = new CountAnswer[] { closeNormal, closeException, closeError };
+        List<IFrameWriter> outputWriters = new ArrayList<IFrameWriter>();
+        for (CountAnswer openAnswer : opens) {
+            for (CountAnswer nextFrameAnswer : nextFrames) {
+                for (CountAnswer failAnswer : fails) {
+                    for (CountAnswer closeAnswer : closes) {
+                        IFrameWriter writer = Mockito.mock(IFrameWriter.class);
+                        Mockito.doAnswer(openAnswer).when(writer).open();
+                        Mockito.doAnswer(nextFrameAnswer).when(writer).nextFrame(Mockito.any());
+                        Mockito.doAnswer(failAnswer).when(writer).fail();
+                        Mockito.doAnswer(closeAnswer).when(writer).close();
+                        outputWriters.add(writer);
+                    }
+                }
+            }
+        }
+        return outputWriters.toArray(new IFrameWriter[outputWriters.size()]);
+    }
+
+    public void testWriter(IFrameWriter writer, ByteBuffer buffer) throws Exception {
+        resetAllCounters();
+        boolean failed = !validate(false);
+        if (failed) {
+            return;
+        }
+        try {
+            writer.open();
+            failed = !validate(false);
+            if (failed) {
+                return;
+            }
+            for (int i = 0; i < 10; i++) {
+                writer.nextFrame(buffer);
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            }
+        } catch (Throwable th1) {
+            try {
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+                writer.fail();
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            } catch (Throwable th2) {
+                failed = !validate(false);
+                if (failed) {
+                    return;
+                }
+            }
+        } finally {
+            if (!failed) {
+                try {
+                    failed = !validate(false);
+                    if (failed) {
+                        return;
+                    }
+                    writer.close();
+                    failed = !validate(true);
+                    if (failed) {
+                        return;
+                    }
+                } catch (Throwable th3) {
+                    failed = !validate(true);
+                    if (failed) {
+                        return;
+                    }
+                }
+            }
+        }
+        successes++;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index 81d7834..9ee7969 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -25,8 +25,14 @@
 public interface IIndexDataflowHelper {
     public void create() throws HyracksDataException;
 
+    /*
+     * If close throws an exception, it means that the index was not closed successfully.
+     */
     public void close() throws HyracksDataException;
 
+    /*
+     * If open throws an exception, it means that the index was not opened successfully.
+     */
     public void open() throws HyracksDataException;
 
     public void destroy() throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82f9fd6..631b2f1 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,8 +33,7 @@
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
-public class IndexBulkLoadOperatorNodePushable extends
-        AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
     protected final float fillFactor;
@@ -48,15 +47,12 @@
     protected IRecordDescriptorProvider recDescProvider;
     protected PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
-    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
-            float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex,
-            IRecordDescriptorProvider recordDescProvider) {
+    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         this.ctx = ctx;
-        this.indexHelper = opDesc.getIndexDataflowHelperFactory()
-                .createIndexDataflowHelper(opDesc, ctx, partition);
+        this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
@@ -68,19 +64,16 @@
 
     @Override
     public void open() throws HyracksDataException {
-        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
-                opDesc.getActivityId(), 0);
+        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(recDesc);
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput,
-                    numElementsHint, checkIfEmptyIndex);
+            writer.open();
+            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
-        writer.open();
     }
 
     @Override
@@ -105,15 +98,24 @@
     public void close() throws HyracksDataException {
         try {
             bulkLoader.end();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         } finally {
-            indexHelper.close();
+            if (index != null) {
+                // If index was opened!
+                try {
+                    indexHelper.close();
+                } finally {
+                    writer.close();
+                }
+            }
         }
-        writer.close();
     }
 
     @Override
     public void fail() throws HyracksDataException {
+        if (index != null) {
+            writer.fail();
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4434871..a2913f4 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -55,6 +55,7 @@
     protected IIndexAccessor indexAccessor;
     protected ITupleFilter tupleFilter;
     protected IModificationOperationCallback modCallback;
+    protected IIndex index;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
@@ -71,12 +72,12 @@
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        writer.open();
         indexHelper.open();
-        IIndex index = indexHelper.getIndexInstance();
+        index = indexHelper.getIndexInstance();
         try {
-            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(indexHelper.getResourceName(),
-                    indexHelper.getResourceID(), index, ctx);
+            writer.open();
+            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+                    indexHelper.getResourceName(), indexHelper.getResourceID(), index, ctx);
             indexAccessor = index.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
@@ -84,7 +85,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -129,8 +129,8 @@
                         break;
                     }
                     default: {
-                        throw new HyracksDataException("Unsupported operation " + op
-                                + " in tree index InsertUpdateDelete operator");
+                        throw new HyracksDataException(
+                                "Unsupported operation " + op + " in tree index InsertUpdateDelete operator");
                     }
                 }
             } catch (HyracksDataException e) {
@@ -147,15 +147,19 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            writer.close();
-        } finally {
-            indexHelper.close();
+        if (index != null) {
+            try {
+                writer.close();
+            } finally {
+                indexHelper.close();
+            }
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        writer.fail();
+        if (index != null) {
+            writer.fail();
+        }
     }
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 3db94e8..568bde8 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -107,11 +107,10 @@
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(inputRecDesc);
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
-
+        accessor = new FrameTupleAccessor(inputRecDesc);
         if (retainNull) {
             int fieldCount = getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -141,7 +140,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -164,13 +162,12 @@
                 dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
                 tb.addFieldEndOffset();
             }
-            FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
-                    tb.getSize());
+            FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
         }
 
         if (!matched && retainInput && retainNull) {
-            FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
-                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
+            FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
+                    nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
         }
     }
 
@@ -192,16 +189,46 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            appender.flush(writer, true);
+        HyracksDataException closeException = null;
+        if (index != null) {
+            // if index == null, then the index open was not successful
+            try {
+                appender.flush(writer, true);
+            } catch (Throwable th) {
+                closeException = new HyracksDataException(th);
+            }
+
             try {
                 cursor.close();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
+            } catch (Throwable th) {
+                if (closeException == null) {
+                    closeException = new HyracksDataException(th);
+                } else {
+                    closeException.addSuppressed(th);
+                }
             }
+            try {
+                indexHelper.close();
+            } catch (Throwable th) {
+                if (closeException == null) {
+                    closeException = new HyracksDataException(th);
+                } else {
+                    closeException.addSuppressed(th);
+                }
+            }
+        }
+        try {
+            // will definitely be called regardless of exceptions
             writer.close();
-        } finally {
-            indexHelper.close();
+        } catch (Throwable th) {
+            if (closeException == null) {
+                closeException = new HyracksDataException(th);
+            } else {
+                closeException.addSuppressed(th);
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }
 
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 8c73272..08775bb 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -56,12 +56,12 @@
         try {
             ITreeIndexFrame cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
             ITreeIndexCursor cursor = treeIndexHelper.createDiskOrderScanCursor(cursorFrame);
-            ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory().createSearchOperationCallback(
-                    treeIndexHelper.getResourceID(), ctx);
-            ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor(
-                    NoOpOperationCallback.INSTANCE, searchCallback);
-            writer.open();
+            ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
+                    .createSearchOperationCallback(treeIndexHelper.getResourceID(), ctx);
+            ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex
+                    .createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
             try {
+                writer.open();
                 indexAccessor.diskOrderScan(cursor);
                 int fieldCount = treeIndex.getFieldCount();
                 FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
@@ -83,16 +83,21 @@
                             tb.getSize());
                 }
                 appender.flush(writer, true);
-            } catch (Exception e) {
+            } catch (Throwable th) {
                 writer.fail();
-                throw new HyracksDataException(e);
+                throw new HyracksDataException(th);
             } finally {
-                cursor.close();
-                writer.close();
+                try {
+                    cursor.close();
+                } catch (Exception cursorCloseException) {
+                    throw new IllegalStateException(cursorCloseException);
+                } finally {
+                    writer.close();
+                }
             }
-        } catch (Exception e) {
+        } catch (Throwable th) {
             treeIndexHelper.close();
-            throw new HyracksDataException(e);
+            throw new HyracksDataException(th);
         }
     }
 
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 4a9ea27..c91aff7 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -66,11 +66,11 @@
 
     @Override
     public void open() throws HyracksDataException {
+        writer.open();
         accessor = new FrameTupleAccessor(inputRecDesc);
         builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
         builderData = builder.getFieldData();
         appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-        writer.open();
     }
 
     @Override
@@ -81,10 +81,9 @@
         for (int i = 0; i < tupleCount; i++) {
             short numTokens = 0;
 
-            tokenizer.reset(
-                    accessor.getBuffer().array(),
-                    accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
-                            + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+            tokenizer.reset(accessor.getBuffer().array(), accessor.getTupleStartOffset(i)
+                    + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, docField),
+                    accessor.getFieldLength(i, docField));
 
             if (addNumTokensKey) {
                 // Get the total number of tokens.
@@ -154,8 +153,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        appender.flush(writer, true);
-        writer.close();
+        try {
+            appender.flush(writer, true);
+        } finally {
+            writer.close();
+        }
     }
 
     @Override