ASTERIXDB-1791, ASTERIXDB-1796: fix failure handling in runtime operators.

This change includes the following parts:
- Fix the implementation of fail() and close() in several runtime operators
  to avoid file handle leak and job hang;
- Add an erase method to RunFileWriter which closes files before deleting
  them in order release the holding disk space;
- Call RunFileWriter.close() and RunFileReader.close() in "finally" blocks.
- Fix RunFileReader to not truncate files to be deleted - it is not the root
  cause of un-released disk space - open deleted files are the root cuase;
- Check file handle leaks in LangExecutionUtil.tearDown().

Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1513
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index a3d6102..d04217c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -83,47 +83,53 @@
             TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
             IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
                     CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
-            partitioner.open();
-            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
             List<TestFrameWriter> recipients = new ArrayList<>();
-            for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
-                recipients.add((TestFrameWriter) writer);
-            }
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 1);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
-            }
-            message.getBuffer().clear();
-            message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
-            message.getBuffer().flip();
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 2);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
-            }
+            try {
+                partitioner.open();
+                FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+                for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+                    recipients.add((TestFrameWriter) writer);
+                }
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 1);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
+                message.getBuffer().clear();
+                message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+                message.getBuffer().flip();
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 2);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
 
-            message.getBuffer().clear();
-            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-            message.getBuffer().flip();
-            partitioner.flush();
-            for (TestFrameWriter writer : recipients) {
-                Assert.assertEquals(writer.nextFrameCount(), 3);
-                fta.reset(writer.getLastFrame());
-                Assert.assertEquals(fta.getTupleCount(), 1);
-                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
-                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
-                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                message.getBuffer().clear();
+                message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+                message.getBuffer().flip();
+                partitioner.flush();
+                for (TestFrameWriter writer : recipients) {
+                    Assert.assertEquals(writer.nextFrameCount(), 3);
+                    fta.reset(writer.getLastFrame());
+                    Assert.assertEquals(fta.getTupleCount(), 1);
+                    FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                    Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                            MessagingFrameTupleAppender.getMessageType(tempBuffer));
+                }
+            } catch (Throwable t) {
+                partitioner.fail();
+                throw t;
+            } finally {
+                partitioner.close();
             }
-            partitioner.close();
             for (TestFrameWriter writer : recipients) {
                 Assert.assertEquals(writer.nextFrameCount(), 4);
                 Assert.assertEquals(writer.closeCount(), 1);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 4db00c8..0e6be0f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,12 @@
 
 package org.apache.asterix.test.runtime;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -28,7 +33,9 @@
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -63,6 +70,9 @@
     }
 
     public static void tearDown() throws Exception {
+        // Check whether there are leaked open run file handles.
+        checkRunFileLeaks();
+
         TestLibrarian.removeLibraryDir();
         ExecutionTestUtil.tearDown(cleanupOnStop);
         ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
@@ -117,4 +127,22 @@
             System.err.flush();
         }
     }
+
+    private static void checkRunFileLeaks() throws IOException {
+        if (SystemUtils.IS_OS_WINDOWS) {
+            return;
+        }
+        // Only run the check on Linux and MacOS.
+        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+        String processName = runtimeMXBean.getName();
+        String processId = processName.split("@")[0];
+
+        // Checks whether there are leaked run files from operators.
+        Process process = Runtime.getRuntime()
+                .exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf|wc -l" });
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+            int runFileCount = Integer.parseInt(reader.readLine().trim());
+            Assert.assertTrue(runFileCount == 0);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
index 1757130..9b2e66d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.aql
@@ -18,11 +18,11 @@
  */
 use dataverse tpch;
 
- 
+
 for $l in dataset('LineItem')
 //where inject-failure($l.l_shipdate <= '1998-09-02', $l.l_orderkey=5999)
 /*+ hash*/
-group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus  
+group by $l_returnflag := $l.l_returnflag, $l_linestatus := $l.l_linestatus
   with $l
 order by $l_returnflag, $l_linestatus
 return {
@@ -32,8 +32,8 @@
   "sum_base_price": sum(for $i in $l return $i.l_extendedprice),
   "sum_disc_price": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount)),
   "sum_charge": sum(for $i in $l return $i.l_extendedprice * (1 - $i.l_discount) * (1 + $i.l_tax)),
-  "ave_qty": avg(for $i in $l return $i.l_quantity),  
+  "ave_qty": avg(for $i in $l return $i.l_quantity),
   "ave_price": avg(for $i in $l return $i.l_extendedprice),
   "ave_disc": avg(for $i in $l return $i.l_discount),
   "count_order": count($l)
-}   
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..a3b7589
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_failure/group_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM  LineItem l
+GROUP BY l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
new file mode 100644
index 0000000..c11a0ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/group_by_hash_failure/group_by_hash_failure.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=5988), SUM(l_quantity) t_sum_quantity
+FROM  LineItem l
+/*+ hash */
+GROUP BY l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..6b277db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.1.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as
+ closed {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
new file mode 100644
index 0000000..39205db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
new file mode 100644
index 0000000..0314a6e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/order_by_failure/order_by_failure.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+SELECT inject_failure(l_orderkey, l_orderkey=1024), l_quantity
+FROM  LineItem l
+ORDER BY l_shipdate, l_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
index f91df13..5ebec80 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.1.ddl.sqlpp
@@ -20,10 +20,10 @@
 drop  dataverse tpch if exists;
 create  dataverse tpch;
 
-use test;
+use tpch;
 
 
-create type test.LineItemType as
+create type LineItemType as
  closed {
   l_orderkey : integer,
   l_partkey : integer,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
index 5fe734c..0340837 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.2.update.sqlpp
@@ -20,5 +20,6 @@
 use tpch;
 
 
-load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
index cac4a08..e96644b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q01_pricing_summary_report_failure/q01_pricing_summary_report_failure.3.query.sqlpp
@@ -17,33 +17,22 @@
  * under the License.
  */
 
-use tpch;
+USE tpch;
 
+SET `import-private-functions` "true";
 
-select element {'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'sum_qty':tpch.coll_sum((
-        select element i.l_quantity
-        from  l as i
-    )),'sum_base_price':tpch.coll_sum((
-        select element i.l_extendedprice
-        from  l as i
-    )),'sum_disc_price':tpch.coll_sum((
-        select element (i.l_extendedprice * (1 - i.l_discount))
-        from  l as i
-    )),'sum_charge':tpch.coll_sum((
-        select element (i.l_extendedprice * (1 - i.l_discount) * (1 + i.l_tax))
-        from  l as i
-    )),'ave_qty':tpch.coll_avg((
-        select element i.l_quantity
-        from  l as i
-    )),'ave_price':tpch.coll_avg((
-        select element i.l_extendedprice
-        from  l as i
-    )),'ave_disc':tpch.coll_avg((
-        select element i.l_discount
-        from  l as i
-    )),'count_order':tpch.coll_count(l)}
-from  LineItem as l
-/* +hash */
-group by l.l_returnflag as l_returnflag,l.l_linestatus as l_linestatus
-order by l_returnflag,l_linestatus
+SELECT  l_returnflag,
+        l_linestatus,
+        sum(l_quantity) AS sum_qty,
+        sum(l_extendedprice) AS sum_base_price,
+        sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
+        avg(l_quantity) AS ave_qty,
+        avg(l_extendedprice) AS ave_price,
+        avg(l_discount) AS ave_disc,
+        count(1) AS count_order
+FROM  LineItem l
+WHERE l.l_shipdate <= '1998-09-02' AND inject_failure(true, l.l_orderkey=5988)
+GROUP BY l_returnflag, l_linestatus
+ORDER BY l_returnflag, l_linestatus
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
new file mode 100644
index 0000000..9e64c15
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.1.ddl.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type LineItemType as {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : bigint,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create type OrderType as {
+  o_orderkey : bigint,
+  o_custkey : bigint,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : bigint,
+  o_comment : string
+}
+
+create type CustomerType as {
+  c_custkey : bigint,
+  c_name : string,
+  c_address : string,
+  c_nationkey : bigint,
+  c_phone : string,
+  c_acctbal : double,
+  c_mktsegment : string,
+  c_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create  dataset Orders(OrderType) primary key o_orderkey;
+
+create  dataset Customer(CustomerType) primary key c_custkey;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
new file mode 100644
index 0000000..83ec6c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load  dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
+
+load  dataset Customer using localfs ((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`),
+(`delimiter`=`|`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
new file mode 100644
index 0000000..35acf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/failure/q18_large_volume_customer_failure/q18_large_volume_customer_failure.3.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `import-private-functions` "true";
+
+WITH tmp AS
+(
+    SELECT l_orderkey, SUM(l_quantity) t_sum_quantity
+    FROM  LineItem
+    GROUP BY l_orderkey
+)
+
+SELECT c.c_name, c.c_custkey, inject_failure(o.o_orderkey, o.o_orderkey=5988),
+       o.o_orderdate, o.o_totalprice, l.l_quantity
+FROM  LineItem l
+JOIN  tmp t ON t.l_orderkey = l.l_orderkey
+JOIN  Orders o ON o.o_orderkey = t.l_orderkey
+JOIN  Customer c ON c.c_custkey = o.o_custkey
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0d8da65..431b215 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2028,13 +2028,36 @@
     </test-case>
   </test-group>
   <test-group name="failure">
-    <!--
-        <test-case FilePath="failure">
-          <compilation-unit name="q1_pricing_summary_report_failure">
-            <output-dir compare="Text">q1_pricing_summary_report_failure</output-dir>
-          </compilation-unit>
-        </test-case>
-        -->
+    <test-case FilePath="failure">
+      <compilation-unit name="group_by_failure">
+        <output-dir compare="Text">group_by_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="group_by_hash_failure">
+        <output-dir compare="Text">group_by_hash_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="q01_pricing_summary_report_failure">
+        <output-dir compare="Text">q01_pricing_summary_report_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="q18_large_volume_customer_failure">
+        <output-dir compare="Text">q18_large_volume_customer_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failure">
+      <compilation-unit name="order_by_failure">
+        <output-dir compare="Text">order_by_failure</output-dir>
+        <expected-error>Injected failure in asterix:inject-failure</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <!--
     <test-group name="flwor">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index baf5dba..dd713e6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -155,6 +155,9 @@
     public void close() throws HyracksDataException {
         try {
             appender.write(writer, true);
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
         } finally {
             writer.close();
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index cd04515..9982477 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -129,8 +129,9 @@
             }
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
+        } finally {
+            writer.close();
         }
-        writer.close();
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 99fff19..fe2d4ec 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,15 +74,17 @@
                 throw new RuntimeDataException(
                         ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
             }
-        } catch (Throwable ie) {
+        } catch (Exception e) {
             /*
-             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
-             * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
-             * The surviving intake partitions must continue to live and receive data from the external source.
+             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another
+             * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is
+             * indicative of a failure at the sibling intake operator location. The surviving intake partitions must
+             * continue to live and receive data from the external source.
              */
-            throw new HyracksDataException(ie);
+            writer.fail();
+            throw e;
         } finally {
-                writer.close();
+            writer.close();
         }
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index b2ffd6e..6869523 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -301,10 +301,11 @@
     @Override
     public void close() throws HyracksDataException {
         try {
-            cursor.close();
-            writer.close();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+            try {
+                cursor.close();
+            } finally {
+                writer.close();
+            }
         } finally {
             indexHelper.close();
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
index c3e2681..377ad60 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
@@ -39,8 +39,11 @@
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
-                writer.close();
+                try {
+                    writer.open();
+                } finally {
+                    writer.close();
+                }
             }
 
         };
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index d38c5b7..33078ff 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -170,8 +170,14 @@
         if (isSink) {
             return;
         }
-        flushIfNotFailed();
-        writer.close();
+        try {
+            flushIfNotFailed();
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
+        } finally {
+            writer.close();
+        }
         appender.reset(frame, true);
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index e94f4b7..2d8eaed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -53,6 +53,9 @@
     public void close() throws HyracksDataException {
         try {
             flushIfNotFailed();
+        } catch (Exception e) {
+            writer.fail();
+            throw e;
         } finally {
             writer.close();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1123c5e..3ac8c40 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -21,8 +21,6 @@
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,6 +34,8 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 60d7eec..aefc99d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -179,7 +179,7 @@
             @Override
             public void fail() throws HyracksDataException {
                 if (isOpen) {
-                    writer.fail();
+                    super.fail();
                 }
             }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8d312d5..b5bd1a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -53,7 +53,7 @@
     public static final int RESULT_FAILURE_EXCEPTION = 16;
     public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
     public static final int INCONSISTENT_RESULT_METADATA = 18;
-    public static final int CANNOT_TRUNCATE_OR_DELETE_FILE = 19;
+    public static final int CANNOT_DELETE_FILE = 19;
     public static final int NOT_A_JOBID = 20;
     public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
     public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 7f90c35..f536d3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -37,7 +37,7 @@
 16 = Failure producing result set %1$s for job %2$s
 17 = No exception for failed result set %1$s for job %2$s
 18 = Inconsistent metadata for result set %1$s"
-19 = Can't truncate or delete the file: %1$s
+19 = Cannot delete the file: %1$s
 20 = '%1$s' is not a valid job id.
 21 = The distributed job %1$s was not found
 22 = The distributed job %1$s already exists
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index f7aa2e8..952eb75 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -55,6 +55,8 @@
 
     private boolean partitionRegistered;
 
+    private boolean failed = false;
+
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
             ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
             DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
@@ -97,6 +99,7 @@
     @Override
     public void fail() throws HyracksDataException {
         try {
+            failed = true;
             resultState.closeAndDelete();
             resultState.abort();
             registerResultPartitionLocation(false);
@@ -111,8 +114,13 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
-        registerResultPartitionLocation(true);
-        resultState.close();
+        try {
+            if (!failed) {
+                registerResultPartitionLocation(true);
+            }
+        } finally {
+            resultState.close();
+        }
         try {
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index be7ed3d..c501b5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -86,6 +86,7 @@
 
     public synchronized void close() {
         eos.set(true);
+        closeWriteFileHandle();
         notifyAll();
     }
 
@@ -93,6 +94,13 @@
         // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
         // to be taken when there are more requests to these result states.
         failed.set(true);
+        closeWriteFileHandle();
+        if (fileRef != null) {
+            fileRef.delete();
+        }
+    }
+
+    private void closeWriteFileHandle() {
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -100,9 +108,6 @@
                 // Since file handle could not be closed, just ignore.
             }
         }
-        if (fileRef != null) {
-            fileRef.delete();
-        }
     }
 
     public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
@@ -114,7 +119,6 @@
         }
 
         size += ioManager.syncWrite(writeFileHandle, size, buffer);
-
         notifyAll();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f0e7f0e..33b8980 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -72,7 +72,6 @@
     }
 
     public void close() throws IOException {
-        channel.close();
         raf.close();
     }
 
@@ -80,10 +79,6 @@
         return fileRef;
     }
 
-    public RandomAccessFile getRandomAccessFile() {
-        return raf;
-    }
-
     public FileChannel getFileChannel() {
         return channel;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index e5e81ab..e51d2bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -80,10 +80,13 @@
             @Override
             public void close() throws HyracksDataException {
                 closeTime = System.currentTimeMillis();
-                ((Task) ctx)
-                        .setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(),
-                                cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
-                writer.close();
+                try {
+                    ((Task) ctx).setPartitionSendProfile(
+                            new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(),
+                                    senderIndex, receiverIndex), openTime, closeTime, mrep));
+                } finally {
+                    writer.close();
+                }
             }
 
             @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 365b01c..d9a4c7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -62,8 +62,14 @@
         if (!open) {
             throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
         }
-        tupleAppender.write(frameWriter, true);
-        frameWriter.close();
+        try {
+            tupleAppender.write(frameWriter, true);
+        } catch (Exception e) {
+            frameWriter.fail();
+            throw e;
+        } finally {
+            frameWriter.close();
+        }
         open = false;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b69f377..f0bd318 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
 
 public class RunFileReader implements IFrameReader {
     private final FileReference file;
@@ -49,7 +48,7 @@
     @Override
     public void open() throws HyracksDataException {
         // Opens RW mode because we need to truncate the given file if required.
-        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
+        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         readPtr = 0;
     }
@@ -87,12 +86,10 @@
     public void close() throws HyracksDataException {
         if (deleteAfterClose) {
             try {
-                // Truncates the file size to zero since OS might be keeping the file for a while.
-                ((FileHandle) handle).getFileChannel().truncate(0);
                 ioManager.close(handle);
                 FileUtils.deleteQuietly(file.getFile());
             } catch (IOException e) {
-                throw HyracksDataException.create(ErrorCode.CANNOT_TRUNCATE_OR_DELETE_FILE, e, file.toString());
+                throw HyracksDataException.create(ErrorCode.CANNOT_DELETE_FILE, e, file.toString());
             }
         } else {
             ioManager.close(handle);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 8031422..915c63d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -27,8 +27,8 @@
 import org.apache.hyracks.api.io.IIOManager;
 
 public class RunFileWriter implements IFrameWriter {
-    private final FileReference file;
     private final IIOManager ioManager;
+    private FileReference file;
     private boolean failed;
 
     private IFileHandle handle;
@@ -69,6 +69,15 @@
         }
     }
 
+    public void erase() throws HyracksDataException {
+        close();
+        file.delete();
+
+        // Make sure we never access the file if it is deleted.
+        file = null;
+        handle = null;
+    }
+
     public FileReference getFileReference() {
         return file;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 7d10802..c049b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -107,10 +107,9 @@
     @Override
     public void close() throws HyracksDataException {
         if (isFailed && state.getRuns() != null) {
-            for (int i = 0; i < state.getRuns().length; i++) {
-                RunFileWriter run = state.getRuns()[i];
+            for (RunFileWriter run : state.getRuns()) {
                 if (run != null) {
-                    run.getFileReference().delete();
+                    run.erase();
                 }
             }
         } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index 0dbb063..b17215f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -95,9 +95,14 @@
             writer.open();
             doPass(table, partitionRuns, numberOfTuples, writer, 1); // level 0 use used at build stage.
         } catch (Exception e) {
-            generatedRuns.forEach(run -> run.getFileReference().delete());
-            writer.fail();
-            throw new HyracksDataException(e);
+            try {
+                for (RunFileWriter run : generatedRuns) {
+                    run.erase();
+                }
+            } finally {
+                writer.fail();
+            }
+            throw e;
         } finally {
             writer.close();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index e0ef2b3..d29e9ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -75,10 +75,17 @@
     }
 
     public void flushSpilledPartitions() throws HyracksDataException {
-        for (int i = 0; i < runWriters.length; ++i) {
-            if (runWriters[i] != null) {
-                flushPartitionToRun(i, runWriters[i]);
-                runWriters[i].close();
+        try {
+            for (int i = 0; i < runWriters.length; ++i) {
+                if (runWriters[i] != null) {
+                    flushPartitionToRun(i, runWriters[i]);
+                }
+            }
+        } finally {
+            for (int i = 0; i < runWriters.length; ++i) {
+                if (runWriters[i] != null) {
+                    runWriters[i].close();
+                }
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index ad14bad..b622c9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -470,8 +470,12 @@
                 @Override
                 public void close() throws HyracksDataException {
                     try {
-                        state.joiner.join(inBuffer.getBuffer(), writer);
-                        state.joiner.closeJoin(writer);
+                        try {
+                            state.joiner.join(inBuffer.getBuffer(), writer);
+                            state.joiner.completeJoin(writer);
+                        } finally {
+                            state.joiner.releaseMemory();
+                        }
                         ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
                                 .createPartitioner();
                         ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
@@ -508,25 +512,35 @@
 
                                 if (buildWriter != null) {
                                     RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
-                                    buildReader.open();
-                                    while (buildReader.nextFrame(inBuffer)) {
-                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
-                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
-                                        joiner.build(copyBuffer);
-                                        inBuffer.reset();
+                                    try {
+                                        buildReader.open();
+                                        while (buildReader.nextFrame(inBuffer)) {
+                                            ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+                                            FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+                                            joiner.build(copyBuffer);
+                                            inBuffer.reset();
+                                        }
+                                    } finally {
+                                        buildReader.close();
                                     }
-                                    buildReader.close();
                                 }
 
                                 // probe
                                 RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
-                                probeReader.open();
-                                while (probeReader.nextFrame(inBuffer)) {
-                                    joiner.join(inBuffer.getBuffer(), writer);
-                                    inBuffer.reset();
+                                try {
+                                    probeReader.open();
+                                    try {
+                                        while (probeReader.nextFrame(inBuffer)) {
+                                            joiner.join(inBuffer.getBuffer(), writer);
+                                            inBuffer.reset();
+                                        }
+                                        joiner.completeJoin(writer);
+                                    } finally {
+                                        joiner.releaseMemory();
+                                    }
+                                } finally {
+                                    probeReader.close();
                                 }
-                                probeReader.close();
-                                joiner.closeJoin(writer);
                             }
                         }
                     } finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 8e52838..ec1c3a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -196,8 +196,11 @@
         accessorProbe.reset(newAccessorProbe.getBuffer());
     }
 
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+    public void completeJoin(IFrameWriter writer) throws HyracksDataException {
         appender.write(writer, true);
+    }
+
+    public void releaseMemory() throws HyracksDataException {
         int nFrames = buffers.size();
         // Frames assigned to the data table will be released here.
         if (bufferManager != null) {
@@ -206,7 +209,6 @@
             }
         }
         buffers.clear();
-
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
                     + Thread.currentThread().getId() + ".");
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index a8d3f7e..cbeadd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -275,9 +275,13 @@
                 @Override
                 public void close() throws HyracksDataException {
                     try {
-                        state.joiner.closeJoin(writer);
+                        state.joiner.completeJoin(writer);
                     } finally {
-                        writer.close();
+                        try {
+                            state.joiner.releaseMemory();
+                        } finally {
+                            writer.close();
+                        }
                     }
                 }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 202aac6..16c21df 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -50,7 +50,6 @@
     private final IFrame outBuffer;
     private final IFrame innerBuffer;
     private final VariableFrameMemoryManager outerBufferMngr;
-    private RunFileReader runFileReader;
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuilder;
@@ -103,14 +102,17 @@
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
-            runFileReader = runFileWriter.createReader();
-            runFileReader.open();
-            while (runFileReader.nextFrame(innerBuffer)) {
-                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+            RunFileReader runFileReader = runFileWriter.createReader();
+            try {
+                runFileReader.open();
+                while (runFileReader.nextFrame(innerBuffer)) {
+                    for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                        blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                    }
                 }
+            } finally {
+                runFileReader.close();
             }
-            runFileReader.close();
             outerBufferMngr.reset();
             if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
                 throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
@@ -174,20 +176,25 @@
         }
     }
 
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        runFileReader = runFileWriter.createDeleteOnCloseReader();
-        runFileReader.open();
-        while (runFileReader.nextFrame(innerBuffer)) {
-            for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+    public void completeJoin(IFrameWriter writer) throws HyracksDataException {
+        RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
+        try {
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
+                    blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
+                }
             }
+        } finally {
+            runFileReader.close();
         }
-        runFileReader.close();
-        outerBufferMngr.reset();
-
         appender.write(writer, true);
     }
 
+    public void releaseMemory() throws HyracksDataException {
+        outerBufferMngr.reset();
+    }
+
     private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
             throws HyracksDataException {
         int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09b7544..5d79f75 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -173,6 +173,7 @@
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private JoinCacheTaskState state;
+                boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -188,8 +189,24 @@
 
                 @Override
                 public void close() throws HyracksDataException {
+                    if (failed) {
+                        try {
+                            state.joiner.closeCache();
+                        } finally {
+                            writer.close();
+                        }
+                        return;
+                    }
                     try {
-                        state.joiner.closeJoin(writer);
+                        try {
+                            state.joiner.completeJoin(writer);
+                        } finally {
+                            state.joiner.releaseMemory();
+                        }
+                    } catch (Exception e) {
+                        state.joiner.closeCache();
+                        writer.fail();
+                        throw e;
                     } finally {
                         writer.close();
                     }
@@ -197,6 +214,7 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    failed = true;
                     writer.fail();
                 }
             };
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 17f009e..a5e2f6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -240,10 +240,10 @@
     /**
      * In case of failure happens, we need to clear up the generated temporary files.
      */
-    public void clearBuildTempFiles() {
+    public void clearBuildTempFiles() throws HyracksDataException {
         for (int i = 0; i < buildRFWriters.length; i++) {
             if (buildRFWriters[i] != null) {
-                buildRFWriters[i].getFileReference().delete();
+                buildRFWriters[i].erase();
             }
         }
     }
@@ -258,17 +258,22 @@
                 runFileWriters = probeRFWriters;
                 break;
         }
-
-        for (int pid = spilledStatus.nextSetBit(0); pid >= 0
-                && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
-            if (bufferManager.getNumTuples(pid) > 0) {
-                bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
-                bufferManager.clearPartition(pid);
+        try {
+            for (int pid = spilledStatus.nextSetBit(0); pid >= 0
+                    && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+                if (bufferManager.getNumTuples(pid) > 0) {
+                    bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
+                    bufferManager.clearPartition(pid);
+                }
             }
-            // It doesn't matter whether a spilled partition currently holds a tuple in memory or not.
-            // The file that holds the corresponding spilled partition needs to be closed.
-            if (runFileWriters[pid] != null) {
-                runFileWriters[pid].close();
+        } finally {
+            // Force to close all run file writers.
+            if (runFileWriters != null) {
+                for (RunFileWriter runFileWriter : runFileWriters) {
+                    if (runFileWriter != null) {
+                        runFileWriter.close();
+                    }
+                }
             }
         }
     }
@@ -418,26 +423,28 @@
 
     private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
         RunFileReader r = wr.createReader();
-        r.open();
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
-        }
-        while (r.nextFrame(reloadBuffer)) {
-            accessorBuild.reset(reloadBuffer.getBuffer());
-            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
-                if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+        try {
+            r.open();
+            if (reloadBuffer == null) {
+                reloadBuffer = new VSizeFrame(ctx);
+            }
+            while (r.nextFrame(reloadBuffer)) {
+                accessorBuild.reset(reloadBuffer.getBuffer());
+                for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+                    if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                        continue;
+                    }
                     // for some reason (e.g. due to fragmentation) if the inserting failed,
                     // we need to clear the occupied frames
                     bufferManager.clearPartition(pid);
-                    r.close();
                     return false;
                 }
             }
+            // Closes and deletes the run file if it is already loaded into memory.
+            r.setDeleteAfterClose(true);
+        } finally {
+            r.close();
         }
-
-        // Closes and deletes the run file if it is already loaded into memory.
-        r.setDeleteAfterClose(true);
-        r.close();
         spilledStatus.set(pid, false);
         buildRFWriters[pid] = null;
         return true;
@@ -538,10 +545,13 @@
         return spilledStatus.nextSetBit(0) < 0;
     }
 
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+    public void completeProbe(IFrameWriter writer) throws HyracksDataException {
         //We do NOT join the spilled partitions here, that decision is made at the descriptor level
         //(which join technique to use)
-        inMemJoiner.closeJoin(writer);
+        inMemJoiner.completeJoin(writer);
+    }
+
+    public void releaseResource() throws HyracksDataException {
         inMemJoiner.closeTable();
         closeAllSpilledPartitions(SIDE.PROBE);
         bufferManager.close();
@@ -553,10 +563,10 @@
     /**
      * In case of failure happens, we need to clear up the generated temporary files.
      */
-    public void clearProbeTempFiles() {
+    public void clearProbeTempFiles() throws HyracksDataException {
         for (int i = 0; i < probeRFWriters.length; i++) {
             if (probeRFWriters[i] != null) {
-                probeRFWriters[i].getFileReference().delete();
+                probeRFWriters[i].erase();
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index a72c0c6..d5e3568 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -385,6 +385,7 @@
 
                 private FrameTupleAppender nullResultAppender = null;
                 private FrameTupleAccessor probeTupleAccessor;
+                private boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -406,21 +407,33 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    state.hybridHJ.clearProbeTempFiles();
+                    failed = true;
                     writer.fail();
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
+                    if (failed) {
+                        try {
+                            // Clear temp files if fail() was called.
+                            state.hybridHJ.clearBuildTempFiles();
+                            state.hybridHJ.clearProbeTempFiles();
+                        } finally {
+                            writer.close(); // writer should always be closed.
+                        }
+                        logProbeComplete();
+                        return;
+                    }
                     try {
-                        state.hybridHJ.closeProbe(writer);
-
+                        try {
+                            state.hybridHJ.completeProbe(writer);
+                        } finally {
+                            state.hybridHJ.releaseResource();
+                        }
                         BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
                         rPartbuff.reset();
                         for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
                                 .nextSetBit(pid + 1)) {
-
                             RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                             RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
 
@@ -434,10 +447,25 @@
                             int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
-
+                    } catch (Exception e) {
+                        // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
+                        // to send the failure signal to the downstream, when there is a throwable thrown.
+                        writer.fail();
+                        // Clear temp files as this.fail() nor this.close() will no longer be called after close().
+                        state.hybridHJ.clearBuildTempFiles();
+                        state.hybridHJ.clearProbeTempFiles();
+                        // Re-throw the whatever is caught.
+                        throw e;
                     } finally {
-                        writer.close();
+                        try {
+                            logProbeComplete();
+                        } finally {
+                            writer.close();
+                        }
                     }
+                }
+
+                private void logProbeComplete() {
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
                     }
@@ -542,9 +570,7 @@
 
                     boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
                     rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys,
@@ -552,79 +578,107 @@
                             nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
-                    buildSideReader.open();
-                    rHHj.initBuild();
-                    rPartbuff.reset();
-                    while (buildSideReader.nextFrame(rPartbuff)) {
-                        rHHj.build(rPartbuff.getBuffer());
+                    try {
+                        buildSideReader.open();
+                        try {
+                            rHHj.initBuild();
+                            rPartbuff.reset();
+                            while (buildSideReader.nextFrame(rPartbuff)) {
+                                rHHj.build(rPartbuff.getBuffer());
+                            }
+                        } finally {
+                            // Makes sure that files are always properly closed.
+                            rHHj.closeBuild();
+                        }
+                    } finally {
+                        buildSideReader.close();
                     }
-                    rHHj.closeBuild();
-                    buildSideReader.close();
-                    probeSideReader.open();
-                    rHHj.initProbe();
-                    rPartbuff.reset();
-                    while (probeSideReader.nextFrame(rPartbuff)) {
-                        rHHj.probe(rPartbuff.getBuffer(), writer);
+                    try {
+                        probeSideReader.open();
+                        rPartbuff.reset();
+                        try {
+                            rHHj.initProbe();
+                            while (probeSideReader.nextFrame(rPartbuff)) {
+                                rHHj.probe(rPartbuff.getBuffer(), writer);
+                            }
+                            rHHj.completeProbe(writer);
+                        } finally {
+                            rHHj.releaseResource();
+                        }
+                    } finally {
+                        // Makes sure that files are always properly closed.
+                        probeSideReader.close();
                     }
-                    rHHj.closeProbe(writer);
-                    probeSideReader.close();
 
-                    int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-                    int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
-                    int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
+                    try {
+                        int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
+                        int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                        int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
 
-                    BitSet rPStatus = rHHj.getPartitionStatus();
-                    if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
-                                    + "(isLeftOuter || build<probe) - [Level " + level + "]");
-                        }
-                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-                            int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
-                            int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+                        BitSet rPStatus = rHHj.getPartitionStatus();
+                        if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
+                            //Case 2.1.1 - Keep applying HHJ
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
+                                        + "(isLeftOuter || build<probe) - [Level " + level + "]");
+                            }
+                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                                int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
+                                int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
 
-                            if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
-                                    appendNullToProbeTuples(rprfw);
+                                if (rbrfw == null || rprfw == null) {
+                                    if (isLeftOuter && rprfw != null) {
+                                        // For the outer join, we don't reverse the role.
+                                        appendNullToProbeTuples(rprfw);
+                                    }
+                                    continue;
                                 }
-                                continue;
-                            }
 
-                            if (isReversed) {
-                                joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
-                            } else {
-                                joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
-                            }
-                        }
-
-                    } else { //Case 2.1.2 - Switch to NLJ
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine(
-                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)"
-                                            + " - [Level " + level + "]");
-                        }
-                        for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                            RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                            RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
-                            if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
-                                    appendNullToProbeTuples(rprfw);
+                                if (isReversed) {
+                                    joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
+                                } else {
+                                    joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
                                 }
-                                continue;
                             }
 
-                            int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
-                            int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
-                            // NLJ order is outer + inner, the order is reversed from the other joins
-                            if (isLeftOuter || probeSideInTups < buildSideInTups) {
-                                applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified
-                            } else {
-                                applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified
+                        } else { //Case 2.1.2 - Switch to NLJ
+                            if (LOGGER.isLoggable(Level.FINE)) {
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
+                                                + "(isLeftOuter || build<probe) - [Level " + level + "]");
+                            }
+                            for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+
+                                if (rbrfw == null || rprfw == null) {
+                                    if (isLeftOuter && rprfw != null) {
+                                        // For the outer join, we don't reverse the role.
+                                        appendNullToProbeTuples(rprfw);
+                                    }
+                                    continue;
+                                }
+
+                                int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
+                                int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+                                // NLJ order is outer + inner, the order is reversed from the other joins
+                                if (isLeftOuter || probeSideInTups < buildSideInTups) {
+                                    //checked-modified
+                                    applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw);
+                                } else {
+                                    //checked-modified
+                                    applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw);
+                                }
                             }
                         }
+                    } catch (Exception e) {
+                        // Make sure that temporary run files generated in recursive hybrid hash joins
+                        // are closed and deleted.
+                        rHHj.clearBuildTempFiles();
+                        rHHj.clearProbeTempFiles();
+                        throw e;
                     }
                 }
 
@@ -635,17 +689,20 @@
                     if (probeTupleAccessor == null) {
                         probeTupleAccessor = new FrameTupleAccessor(probeRd);
                     }
-                    probReader.open();
-                    while (probReader.nextFrame(rPartbuff)) {
-                        probeTupleAccessor.reset(rPartbuff.getBuffer());
-                        for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
-                            FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
+                    try {
+                        probReader.open();
+                        while (probReader.nextFrame(rPartbuff)) {
+                            probeTupleAccessor.reset(rPartbuff.getBuffer());
+                            for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
+                                FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
                                     nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
                                     nullTupleBuild.getSize());
+                            }
                         }
+                        nullResultAppender.write(writer, true);
+                    } finally {
+                        probReader.close();
                     }
-                    probReader.close();
-                    nullResultAppender.write(writer, true);
                 }
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
@@ -654,9 +711,7 @@
                         throws HyracksDataException {
                     boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
-
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-
                     IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
                             state.memForJoin * ctx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
@@ -667,39 +722,52 @@
                             new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
                             predEvaluator, isReversed, bufferManager);
 
-                    bReader.open();
-                    rPartbuff.reset();
-                    while (bReader.nextFrame(rPartbuff)) {
-                        // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
-                        // in the InMemoryHashJoin.
-                        ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
-                        // If a frame cannot be allocated, there may be a chance if we can compact the table,
-                        // one or more frame may be reclaimed.
-                        if (copyBuffer == null) {
-                            if (joiner.compactHashTable() > 0) {
-                                copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
-                            }
+                    try {
+                        bReader.open();
+                        rPartbuff.reset();
+                        while (bReader.nextFrame(rPartbuff)) {
+                            // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
+                            // in the InMemoryHashJoin.
+                            ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                            // If a frame cannot be allocated, there may be a chance if we can compact the table,
+                            // one or more frame may be reclaimed.
                             if (copyBuffer == null) {
-                                // Still no frame is allocated? At this point, we have no way to get a frame.
-                                throw new HyracksDataException(
-                                        "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                                if (joiner.compactHashTable() > 0) {
+                                    copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                                }
+                                if (copyBuffer == null) {
+                                    // Still no frame is allocated? At this point, we have no way to get a frame.
+                                    throw new HyracksDataException(
+                                            "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                                }
                             }
+                            FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
+                            joiner.build(copyBuffer);
+                            rPartbuff.reset();
                         }
-                        FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
-                        joiner.build(copyBuffer);
-                        rPartbuff.reset();
+                    } finally {
+                        bReader.close();
                     }
-                    bReader.close();
-                    rPartbuff.reset();
-                    //probe
-                    pReader.open();
-                    while (pReader.nextFrame(rPartbuff)) {
-                        joiner.join(rPartbuff.getBuffer(), writer);
+                    try {
+                        //probe
+                        pReader.open();
                         rPartbuff.reset();
+                        try {
+                            while (pReader.nextFrame(rPartbuff)) {
+                                joiner.join(rPartbuff.getBuffer(), writer);
+                                rPartbuff.reset();
+                            }
+                            joiner.completeJoin(writer);
+                        } finally {
+                            joiner.releaseMemory();
+                        }
+                    } finally {
+                        try {
+                            pReader.close();
+                        } finally {
+                            joiner.closeTable();
+                        }
                     }
-                    pReader.close();
-                    joiner.closeJoin(writer);
-                    joiner.closeTable();
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
@@ -716,40 +784,38 @@
                     nlj.setIsReversed(isReversed);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);
-                    innerReader.open();
-                    while (innerReader.nextFrame(cacheBuff)) {
-                        nlj.cache(cacheBuff.getBuffer());
-                        cacheBuff.reset();
+                    try {
+                        innerReader.open();
+                        while (innerReader.nextFrame(cacheBuff)) {
+                            nlj.cache(cacheBuff.getBuffer());
+                            cacheBuff.reset();
+                        }
+                    } finally {
+                        try {
+                            nlj.closeCache();
+                        } finally {
+                            innerReader.close();
+                        }
                     }
-                    nlj.closeCache();
-
-                    IFrame joinBuff = new VSizeFrame(ctx);
-                    outerReader.open();
-
-                    while (outerReader.nextFrame(joinBuff)) {
-                        nlj.join(joinBuff.getBuffer(), writer);
-                        joinBuff.reset();
+                    try {
+                        IFrame joinBuff = new VSizeFrame(ctx);
+                        outerReader.open();
+                        try {
+                            while (outerReader.nextFrame(joinBuff)) {
+                                nlj.join(joinBuff.getBuffer(), writer);
+                                joinBuff.reset();
+                            }
+                            nlj.completeJoin(writer);
+                        } finally {
+                            nlj.releaseMemory();
+                        }
+                    } finally {
+                        outerReader.close();
                     }
-
-                    nlj.closeJoin(writer);
-                    outerReader.close();
-                    innerReader.close();
                 }
             };
             return op;
         }
     }
 
-    public void setSkipInMemHJ(boolean b) {
-        skipInMemoryHJ = b;
-    }
-
-    public void setForceNLJ(boolean b) {
-        forceNLJ = b;
-    }
-
-    public void setForceRR(boolean b) {
-        forceRoleReversal = !isLeftOuter && b;
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 4253114..e7da174 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -82,14 +82,12 @@
                 while (in.nextFrame(frame)) {
                     writer.nextFrame(frame.getBuffer());
                 }
-            } catch (Throwable th) {
-                throw new HyracksDataException(th);
             } finally {
                 in.close();
             }
-        } catch (Throwable th) {
+        } catch (Exception e) {
             writer.fail();
-            throw new HyracksDataException(th);
+            throw e;
         } finally {
             writer.close();
             if (numConsumers.decrementAndGet() == 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 90b4b6c..b422ef4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -80,7 +80,8 @@
         final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
-            IFrameWriter datasetPartitionWriter;
+            private IFrameWriter datasetPartitionWriter;
+            private boolean failed = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -110,15 +111,22 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 datasetPartitionWriter.fail();
             }
 
             @Override
             public void close() throws HyracksDataException {
-                if (frameOutputStream.getTupleCount() > 0) {
-                    frameOutputStream.flush(datasetPartitionWriter);
+                try {
+                    if (!failed && frameOutputStream.getTupleCount() > 0) {
+                        frameOutputStream.flush(datasetPartitionWriter);
+                    }
+                } catch (Exception e) {
+                    datasetPartitionWriter.fail();
+                    throw e;
+                } finally {
+                    datasetPartitionWriter.close();
                 }
-                datasetPartitionWriter.close();
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 6d9d085..f4158ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -84,10 +84,13 @@
                 finalWriter = prepareSkipMergingFinalResultWriter(writer);
                 finalWriter.open();
                 if (sorter != null) {
-                    if (sorter.hasRemaining()) {
-                        sorter.flush(finalWriter);
+                    try {
+                        if (sorter.hasRemaining()) {
+                            sorter.flush(finalWriter);
+                        }
+                    } finally {
+                        sorter.close();
                     }
-                    sorter.close();
                 }
             } else {
                 /** recycle sort buffer */
@@ -128,10 +131,15 @@
                             RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
                             IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
 
-                            mergeResultWriter.open();
-                            merge(mergeResultWriter, partialRuns);
-                            mergeResultWriter.close();
-
+                            try {
+                                mergeResultWriter.open();
+                                merge(mergeResultWriter, partialRuns);
+                            } catch (Throwable t) {
+                                mergeResultWriter.fail();
+                                throw t;
+                            } finally {
+                                mergeResultWriter.close();
+                            }
                             reader = mergeFileWriter.createReader();
                         }
                         runs.add(reader);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index cdabcda..ef9e4b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -103,7 +103,7 @@
                     writer.nextFrame(frame);
                 } catch (Exception e) {
                     writer.fail();
-                    throw new HyracksDataException(e);
+                    throw e;
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index bba18b3..683857f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,9 @@
             public void initialize() throws HyracksDataException {
                 try {
                     writer.open();
-                } catch (Throwable th) {
+                } catch (Exception e) {
                     writer.fail();
-                    throw new HyracksDataException(th);
+                    throw e;
                 } finally {
                     writer.close();
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
index f83ab6a..521dff1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -227,12 +227,14 @@
 
         ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
 
-        getBuilder().open();
-        for (IFrame frame : input) {
-            getBuilder().nextFrame(frame.getBuffer());
+        try {
+            getBuilder().open();
+            for (IFrame frame : input) {
+                getBuilder().nextFrame(frame.getBuffer());
+            }
+        } finally {
+            getBuilder().close();
         }
-        getBuilder().close();
-
         getMerger().setOutputFrameWriter(0, writer, outputRec);
         getMerger().initialize();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 673c6fa..cfd4f30 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -130,13 +130,16 @@
 
         assertTrue(runs.size() > 0);
         for (GeneratedRunFileReader run : runs) {
-            run.open();
-            int preKey = Integer.MIN_VALUE;
-            while (run.nextFrame(frame)) {
-                fta.reset(frame.getBuffer());
-                preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+            try {
+                run.open();
+                int preKey = Integer.MIN_VALUE;
+                while (run.nextFrame(frame)) {
+                    fta.reset(frame.getBuffer());
+                    preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
+                }
+            } finally {
+                run.close();
             }
-            run.close();
         }
         assertTrue(keyValuePair.isEmpty());
     }