Cancel the on-going job if waitForCompletion is interrupted.
Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1825
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..ad54110 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -138,7 +138,13 @@
@Override
public void waitForCompletion(JobId jobId) throws Exception {
- hci.waitForCompletion(jobId);
+ try {
+ hci.waitForCompletion(jobId);
+ } catch (InterruptedException e) {
+ // Cancels an on-going job if the current thread gets interrupted.
+ hci.cancelJob(jobId);
+ throw e;
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 148d4f5..05a7e2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +38,7 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.BaseCCApplication;
@@ -52,8 +51,9 @@
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
public abstract class AbstractMultiNCIntegrationTest {
@@ -70,9 +70,6 @@
private final List<File> outputFiles;
- @Rule
- public TemporaryFolder outputFolder = new TemporaryFolder();
-
public AbstractMultiNCIntegrationTest() {
outputFiles = new ArrayList<>();
}
@@ -133,6 +130,10 @@
hcc.waitForCompletion(jobId);
}
+ protected JobStatus getJobStatus(JobId jobId) throws Exception {
+ return hcc.getJobStatus(jobId);
+ }
+
protected void cancelJob(JobId jobId) throws Exception {
hcc.cancelJob(jobId);
}
@@ -207,15 +208,6 @@
}
}
- protected File createTempFile() throws IOException {
- File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Output file: " + tempFile.getAbsolutePath());
- }
- outputFiles.add(tempFile);
- return tempFile;
- }
-
public static class DummyApplication extends BaseCCApplication {
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7c3b66f..7eba9e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.io.ManagedFileSplit;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -61,6 +62,14 @@
public class CancelJobTest extends AbstractMultiNCIntegrationTest {
@Test
+ public void interruptJobClientAfterWaitForCompletion() throws Exception {
+ // Interrupts the job client after waitForCompletion() is called.
+ for (JobSpecification spec : testJobs()) {
+ interruptAfterWaitForCompletion(spec);
+ }
+ }
+
+ @Test
public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
//Cancels executing jobs after waitForCompletion() is called.
for (JobSpecification spec : testJobs()) {
@@ -167,6 +176,38 @@
}
}
+ private void interruptAfterWaitForCompletion(JobSpecification spec) throws Exception {
+ // Submits the job
+ final JobId jobIdForInterruptTest = startJob(spec);
+
+ // Waits for completion in anther thread
+ Thread thread = new Thread(() -> {
+ try {
+ waitForCompletion(jobIdForInterruptTest);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof InterruptedException);
+ }
+ });
+ thread.start();
+
+ // Interrupts the wait-for-completion thread.
+ thread.interrupt();
+
+ // Waits until the thread terminates.
+ thread.join();
+
+ // Verifies the job status.
+ JobStatus jobStatus = getJobStatus(jobIdForInterruptTest);
+ while (jobStatus == JobStatus.RUNNING) {
+ synchronized (this) {
+ // Since job cancellation is asynchronous on NCs, we have to wait there.
+ wait(1000);
+ }
+ jobStatus = getJobStatus(jobIdForInterruptTest);
+ }
+ Assert.assertTrue(jobStatus == JobStatus.FAILURE);
+ }
+
private void cancelWithoutWait(JobSpecification spec) throws Exception {
JobId jobId = startJob(spec);
cancelJob(jobId);