Cancel the on-going job if waitForCompletion is interrupted.

Change-Id: I3417271660e815a13fd706e1cc057bca6a625c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1825
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..ad54110 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -138,7 +138,13 @@
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        hci.waitForCompletion(jobId);
+        try {
+            hci.waitForCompletion(jobId);
+        } catch (InterruptedException e) {
+            // Cancels an on-going job if the current thread gets interrupted.
+            hci.cancelJob(jobId);
+            throw e;
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 148d4f5..05a7e2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -40,6 +38,7 @@
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.control.cc.BaseCCApplication;
@@ -52,8 +51,9 @@
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 
 public abstract class AbstractMultiNCIntegrationTest {
 
@@ -70,9 +70,6 @@
 
     private final List<File> outputFiles;
 
-    @Rule
-    public TemporaryFolder outputFolder = new TemporaryFolder();
-
     public AbstractMultiNCIntegrationTest() {
         outputFiles = new ArrayList<>();
     }
@@ -133,6 +130,10 @@
         hcc.waitForCompletion(jobId);
     }
 
+    protected JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hcc.getJobStatus(jobId);
+    }
+
     protected void cancelJob(JobId jobId) throws Exception {
         hcc.cancelJob(jobId);
     }
@@ -207,15 +208,6 @@
         }
     }
 
-    protected File createTempFile() throws IOException {
-        File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Output file: " + tempFile.getAbsolutePath());
-        }
-        outputFiles.add(tempFile);
-        return tempFile;
-    }
-
     public static class DummyApplication extends BaseCCApplication {
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7c3b66f..7eba9e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -61,6 +62,14 @@
 public class CancelJobTest extends AbstractMultiNCIntegrationTest {
 
     @Test
+    public void interruptJobClientAfterWaitForCompletion() throws Exception {
+        // Interrupts the job client after waitForCompletion() is called.
+        for (JobSpecification spec : testJobs()) {
+            interruptAfterWaitForCompletion(spec);
+        }
+    }
+
+    @Test
     public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
         //Cancels executing jobs after waitForCompletion() is called.
         for (JobSpecification spec : testJobs()) {
@@ -167,6 +176,38 @@
         }
     }
 
+    private void interruptAfterWaitForCompletion(JobSpecification spec) throws Exception {
+        // Submits the job
+        final JobId jobIdForInterruptTest = startJob(spec);
+
+        // Waits for completion in anther thread
+        Thread thread = new Thread(() -> {
+            try {
+                waitForCompletion(jobIdForInterruptTest);
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof InterruptedException);
+            }
+        });
+        thread.start();
+
+        // Interrupts the wait-for-completion thread.
+        thread.interrupt();
+
+        // Waits until the thread terminates.
+        thread.join();
+
+        // Verifies the job status.
+        JobStatus jobStatus = getJobStatus(jobIdForInterruptTest);
+        while (jobStatus == JobStatus.RUNNING) {
+            synchronized (this) {
+                // Since job cancellation is asynchronous on NCs, we have to wait there.
+                wait(1000);
+            }
+            jobStatus = getJobStatus(jobIdForInterruptTest);
+        }
+        Assert.assertTrue(jobStatus == JobStatus.FAILURE);
+    }
+
     private void cancelWithoutWait(JobSpecification spec) throws Exception {
         JobId jobId = startJob(spec);
         cancelJob(jobId);