[NO ISSUE][RT] Ensure Fail is Called on RunFileWriter

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- When OptimizedHybridHashJoin fails, ensure that fail is
  called on any RunFileWriter that was initialized. This
  will ensure that any open run files are closed.

Change-Id: I27fa54367045e90540ef571a4cf33723aca66c53
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5924
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 6e6d342..220311e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -96,7 +96,9 @@
             } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
                 // NOSONAR ignore logging failure
             }
-            root.addSuppressed(th);
+            if (root != null) {
+                root.addSuppressed(th);
+            }
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 68b4b7b..59bb7ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -219,6 +220,19 @@
         }
     }
 
+    public void fail() throws HyracksDataException {
+        for (RunFileWriter writer : buildRFWriters) {
+            if (writer != null) {
+                CleanupUtils.fail(writer, null);
+            }
+        }
+        for (RunFileWriter writer : probeRFWriters) {
+            if (writer != null) {
+                CleanupUtils.fail(writer, null);
+            }
+        }
+    }
+
     private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, String refName) throws HyracksDataException {
         try {
             for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 1819b8d..97f9c24 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -273,7 +273,7 @@
                 ITuplePartitionComputer buildHpc =
                         new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
                                 .createPartitioner(0);
-                boolean isFailed = false;
+                boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -302,21 +302,24 @@
                 @Override
                 public void close() throws HyracksDataException {
                     if (state.hybridHJ != null) {
-                        state.hybridHJ.closeBuild();
-                        if (isFailed) {
-                            state.hybridHJ.clearBuildTempFiles();
-                        } else {
+                        if (!failed) {
+                            state.hybridHJ.closeBuild();
                             ctx.setStateObject(state);
                             if (LOGGER.isTraceEnabled()) {
                                 LOGGER.trace("OptimizedHybridHashJoin closed its build phase");
                             }
+                        } else {
+                            state.hybridHJ.clearBuildTempFiles();
                         }
                     }
                 }
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    isFailed = true;
+                    failed = true;
+                    if (state.hybridHJ != null) {
+                        state.hybridHJ.fail();
+                    }
                 }
 
                 @Override
@@ -401,6 +404,9 @@
                 @Override
                 public void fail() throws HyracksDataException {
                     failed = true;
+                    if (state.hybridHJ != null) {
+                        state.hybridHJ.fail();
+                    }
                     writer.fail();
                 }
 
@@ -447,6 +453,9 @@
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
                     } catch (Exception e) {
+                        if (state.hybridHJ != null) {
+                            state.hybridHJ.fail();
+                        }
                         // Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
                         // to send the failure signal to the downstream, when there is a throwable thrown.
                         writer.fail();