[NO ISSUE][HYR] Fix wait for completion work
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- When wait for completion is called on a job that was not created
yet, an exception is returned.
- When wait for completion is called on a job that has been cleared
from job archive, it is retrieved correctly from history.
- When wait for completion is called on a job that has been cleared
from job history, an exception is returned.
- Test cases that fail before the fix have been added.
Change-Id: I9e50f6ce1df9f27517d7ec3a3f8a5d38246f71ff
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1999
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index ff98efa..cf83bca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -125,6 +125,8 @@
public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
public static final int ILLEGAL_MEMORY_BUDGET = 90;
public static final int TIMEOUT = 91;
+ public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
+ public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index dd63786..eea6b52 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -27,6 +27,10 @@
return new JobId(id.getAndIncrement());
}
+ public long maxJobId() {
+ return id.get();
+ }
+
public void ensureMinimumId(long id) {
this.id.updateAndGet(current -> Math.max(current, id));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 6d4ccdb..56abee5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -108,5 +108,7 @@
89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
91 = Operation timed out
+92 = Job %1$s has been cleared from job history
+93 = Job %1$s has not been created yet
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index b3e65ad..327c422 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -60,8 +60,7 @@
}
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
switch (fn.getFunctionId()) {
case GET_CLUSTER_CONTROLLER_INFO:
@@ -86,7 +85,7 @@
case DISTRIBUTE_JOB:
HyracksClientInterfaceFunctions.DistributeJobFunction djf =
(HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
- ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(),
+ ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory,
new IPCResponder<JobId>(handle, mid)));
break;
case DESTROY_JOB:
@@ -104,42 +103,30 @@
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
- JobId jobId = sjf.getJobId();
- byte[] acggfBytes = null;
- boolean predistributed = false;
- if (jobId == null) {
- //The job is new
- jobId = jobIdFactory.create();
- acggfBytes = sjf.getACGGFBytes();
- } else {
- //The job has been predistributed. We don't need to send an ActivityClusterGraph
- predistributed = true;
- }
- ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
- jobId, new IPCResponder<JobId>(handle, mid), predistributed));
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(),
+ sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory));
break;
case GET_DATASET_DIRECTORY_SERIVICE_INFO:
- ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
- new IPCResponder<NetworkAddress>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new GetDatasetDirectoryServiceInfoWork(ccs, new IPCResponder<NetworkAddress>(handle, mid)));
break;
case GET_DATASET_RESULT_STATUS:
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf =
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(),
- gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
+ ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), gdrsf.getResultSetId(),
+ new IPCResponder<Status>(handle, mid)));
break;
case GET_DATASET_RESULT_LOCATIONS:
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs,
- gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
- new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
+ gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid)));
break;
case WAIT_FOR_COMPLETION:
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
- ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(),
- new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue()
+ .schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(), new IPCResponder<>(handle, mid)));
break;
case GET_NODE_CONTROLLERS_INFO:
ccs.getWorkQueue().schedule(
@@ -155,33 +142,33 @@
case CLI_DEPLOY_BINARY:
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
- ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(),
- dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(),
+ new IPCResponder<>(handle, mid)));
break;
case CLI_UNDEPLOY_BINARY:
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
- ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(),
- new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
break;
case CLUSTER_SHUTDOWN:
HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
- ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
- csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
+ ccs.getWorkQueue().schedule(
+ new ClusterShutdownWork(ccs, csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
break;
case GET_NODE_DETAILS_JSON:
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
ccs.getWorkQueue()
.schedule(new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), gndjf.getNodeId(),
- gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
+ gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
break;
case THREAD_DUMP:
HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
(HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
- ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(),
- new IPCResponder<String>(handle, mid)));
+ ccs.getWorkQueue()
+ .schedule(new GetThreadDumpWork(ccs, tdf.getNode(), new IPCResponder<String>(handle, mid)));
break;
default:
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4ba847d..c6d90a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,8 +46,8 @@
import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.work.NoOpCallback;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.NoOpCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -92,7 +92,7 @@
runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
private static final long serialVersionUID = 1L;
/** history size + 1 is for the case when history size = 0 */
- private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
+ private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@Override
protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
@@ -277,7 +277,8 @@
@Override
public List<Exception> getExceptionHistory(JobId jobId) {
- return runMapHistory.get(jobId);
+ List<Exception> exceptions = runMapHistory.get(jobId);
+ return exceptions == null ? runMapHistory.containsKey(jobId) ? Collections.emptyList() : null : exceptions;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
index e5fd66a..5a57b1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -37,12 +38,12 @@
public class DistributeJobWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
- private final JobId jobId;
+ private final JobIdFactory jobIdFactory;
private final IResultCallback<JobId> callback;
- public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId,
+ public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory,
IResultCallback<JobId> callback) {
- this.jobId = jobId;
+ this.jobIdFactory = jobIdFactory;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.callback = callback;
@@ -51,6 +52,7 @@
@Override
protected void doRun() throws Exception {
try {
+ JobId jobId = jobIdFactory.create();
final CCServiceContext ccServiceCtx = ccs.getContext();
ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
IActivityClusterGraphGeneratorFactory acggf =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e083d2a..ed82705 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.job.IJobManager;
@@ -38,19 +39,19 @@
private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
private final DeploymentId deploymentId;
- private final JobId jobId;
+ private final JobId preDistributedJobId;
private final IResultCallback<JobId> callback;
- private final boolean predestributed;
+ private final JobIdFactory jobIdFactory;
public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
- EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
+ EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) {
this.deploymentId = deploymentId;
- this.jobId = jobId;
+ this.preDistributedJobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.callback = callback;
- this.predestributed = predestributed;
+ this.jobIdFactory = jobIdFactory;
}
@Override
@@ -58,8 +59,10 @@
IJobManager jobManager = ccs.getJobManager();
try {
final CCServiceContext ccServiceCtx = ccs.getContext();
+ JobId jobId;
JobRun run;
- if (!predestributed) {
+ if (preDistributedJobId == null) {
+ jobId = jobIdFactory.create();
//Need to create the ActivityClusterGraph
IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
.deserialize(acggfBytes, deploymentId, ccServiceCtx);
@@ -67,6 +70,7 @@
acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
+ jobId = preDistributedJobId;
//ActivityClusterGraph has already been distributed
run = new JobRun(ccs, deploymentId, jobId, jobFlags,
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 713cf96..f1d9a4d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -18,12 +18,15 @@
*/
package org.apache.hyracks.control.cc.work;
+import java.util.Collections;
import java.util.List;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
-import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable;
+import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -41,13 +44,13 @@
@Override
protected void doRun() throws Exception {
IJobManager jobManager = ccs.getJobManager();
- final IJobStatusConditionVariable cRunningVar = jobManager.get(jobId);
- if (cRunningVar != null) {
+ final JobRun jobRun = jobManager.get(jobId);
+ if (jobRun != null) {
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
- cRunningVar.waitForCompletion();
+ jobRun.waitForCompletion();
callback.setValue(null);
} catch (Exception e) {
callback.setException(e);
@@ -55,18 +58,28 @@
}
});
} else {
- final List<Exception> exceptions = jobManager.getExceptionHistory(jobId);
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
+ // Couldn't find jobRun
+ List<Exception> exceptionHistory = jobManager.getExceptionHistory(jobId);
+ List<Exception> exceptions;
+ if (exceptionHistory == null) {
+ // couldn't be found
+ long maxJobId = ccs.getJobIdFactory().maxJobId();
+ exceptions = Collections.singletonList(jobId.getId() <= maxJobId
+ ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)
+ : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId));
+
+ } else {
+ exceptions = exceptionHistory;
+ }
+ ccs.getExecutor().execute(() -> {
+ if (!exceptions.isEmpty()) {
+ /**
+ * only report the first exception because IResultCallback will only throw one exception
+ * anyway
+ */
+ callback.setException(exceptions.get(0));
+ } else {
callback.setValue(null);
- if (exceptions != null && !exceptions.isEmpty()) {
- /**
- * only report the first exception because IResultCallback will only throw one exception
- * anyway
- */
- callback.setException(exceptions.get(0));
- }
}
});
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 18479e2..21c1e77 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -62,13 +62,13 @@
public static final String[] ASTERIX_IDS =
{ "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" };
- private static ClusterControllerService cc;
+ protected static ClusterControllerService cc;
- private static NodeControllerService[] asterixNCs;
+ protected static NodeControllerService[] asterixNCs;
- private static IHyracksClientConnection hcc;
+ protected static IHyracksClientConnection hcc;
- private final List<File> outputFiles;
+ protected final List<File> outputFiles;
public AbstractMultiNCIntegrationTest() {
outputFiles = new ArrayList<>();
@@ -82,6 +82,7 @@
ccConfig.setClusterListenAddress("127.0.0.1");
ccConfig.setClusterListenPort(39001);
ccConfig.setProfileDumpPeriod(10000);
+ ccConfig.setJobHistorySize(2);
File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
@@ -186,24 +187,30 @@
readSize = reader.read(resultFrame);
}
}
+ waitForCompletion(jobId, expectedErrorMessage);
+ // Waiting a second time should lead to the same behavior
+ waitForCompletion(jobId, expectedErrorMessage);
+ dumpOutputFiles();
+ }
+
+ protected void waitForCompletion(JobId jobId, String expectedErrorMessage) throws Exception {
boolean expectedExceptionThrown = false;
try {
hcc.waitForCompletion(jobId);
- } catch (HyracksDataException hde) {
+ } catch (Exception e) {
if (expectedErrorMessage != null) {
- if (hde.toString().contains(expectedErrorMessage)) {
+ if (e.toString().contains(expectedErrorMessage)) {
expectedExceptionThrown = true;
} else {
- throw hde;
+ throw e;
}
} else {
- throw hde;
+ throw e;
}
}
if (expectedErrorMessage != null && !expectedExceptionThrown) {
throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown");
}
- dumpOutputFiles();
}
private void dumpOutputFiles() {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 13a103e..34b1480 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -32,9 +33,27 @@
@Test
public void failureOnCreatePushRuntime() throws Exception {
- for (int round = 0; round < 100; ++round) {
+ JobId jobId = new JobId(0); // First job
+ for (int i = 0; i < 20; i++) {
+ execTest();
+ if (i == 0) {
+ // passes. read from job archive
+ waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+ }
+ }
+ // passes. read from job history
+ waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+ for (int i = 0; i < 300; i++) {
execTest();
}
+ // passes. history has been cleared
+ waitForCompletion(jobId, "has been cleared from job history");
+ }
+
+ @Test
+ public void waitForNonExistingJob() throws Exception {
+ JobId jobId = new JobId(Long.MAX_VALUE);
+ waitForCompletion(jobId, "has not been created yet");
}
private void execTest() throws Exception {