[NO ISSUE][RT] Ensure Fail is Called on RunFileWriter
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- When OptimizedHybridHashJoin fails, ensure that fail is
called on any RunFileWriter that was initialized. This
will ensure that any open run files are closed.
Change-Id: I27fa54367045e90540ef571a4cf33723aca66c53
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5924
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 6e6d342..220311e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -96,7 +96,9 @@
} catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
// NOSONAR ignore logging failure
}
- root.addSuppressed(th);
+ if (root != null) {
+ root.addSuppressed(th);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 68b4b7b..59bb7ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -219,6 +220,19 @@
}
}
+ public void fail() throws HyracksDataException {
+ for (RunFileWriter writer : buildRFWriters) {
+ if (writer != null) {
+ CleanupUtils.fail(writer, null);
+ }
+ }
+ for (RunFileWriter writer : probeRFWriters) {
+ if (writer != null) {
+ CleanupUtils.fail(writer, null);
+ }
+ }
+ }
+
private void closeAllSpilledPartitions(RunFileWriter[] runFileWriters, String refName) throws HyracksDataException {
try {
for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 1819b8d..97f9c24 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -273,7 +273,7 @@
ITuplePartitionComputer buildHpc =
new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
.createPartitioner(0);
- boolean isFailed = false;
+ boolean failed = false;
@Override
public void open() throws HyracksDataException {
@@ -302,21 +302,24 @@
@Override
public void close() throws HyracksDataException {
if (state.hybridHJ != null) {
- state.hybridHJ.closeBuild();
- if (isFailed) {
- state.hybridHJ.clearBuildTempFiles();
- } else {
+ if (!failed) {
+ state.hybridHJ.closeBuild();
ctx.setStateObject(state);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("OptimizedHybridHashJoin closed its build phase");
}
+ } else {
+ state.hybridHJ.clearBuildTempFiles();
}
}
}
@Override
public void fail() throws HyracksDataException {
- isFailed = true;
+ failed = true;
+ if (state.hybridHJ != null) {
+ state.hybridHJ.fail();
+ }
}
@Override
@@ -401,6 +404,9 @@
@Override
public void fail() throws HyracksDataException {
failed = true;
+ if (state.hybridHJ != null) {
+ state.hybridHJ.fail();
+ }
writer.fail();
}
@@ -447,6 +453,9 @@
joinPartitionPair(bReader, pReader, bSize, pSize, 1);
}
} catch (Exception e) {
+ if (state.hybridHJ != null) {
+ state.hybridHJ.fail();
+ }
// Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
// to send the failure signal to the downstream, when there is a throwable thrown.
writer.fail();