Add job cancellation support in Hyracks.
This change also:
- cleans up JobRun;
- adds tests for job cancellation.
Change-Id: Ic26330c19c8642dd3246739b5150c4aa667c359c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1537
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index a651833..29998fd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6758,7 +6758,7 @@
<test-case FilePath="load">
<compilation-unit name="duplicate-key-error">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.hyracks.api.exceptions.HyracksException</expected-error>
+ <expected-error>Input stream given to BTree bulk load has duplicates</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa292f6..aa9232e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -38,6 +38,7 @@
START_JOB,
DISTRIBUTE_JOB,
DESTROY_JOB,
+ CANCEL_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
GET_DATASET_RESULT_LOCATIONS,
@@ -122,6 +123,25 @@
}
}
+ public static class CancelJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public CancelJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CANCEL_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class DestroyJobFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 8e7affb..0142c7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,6 +69,13 @@
}
@Override
+ public void cancelJob(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction(
+ jobId);
+ rpci.call(ipcHandle, cjf);
+ }
+
+ @Override
public JobId startJob(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 5da1f34..4b3aff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -91,6 +91,11 @@
}
@Override
+ public void cancelJob(JobId jobId) throws Exception {
+ hci.cancelJob(jobId);
+ }
+
+ @Override
public JobId startJob(JobSpecification jobSpec) throws Exception {
return startJob(jobSpec, EnumSet.noneOf(JobFlag.class));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index e65cacd..0956d85 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -57,6 +57,15 @@
public JobInfo getJobInfo(JobId jobId) throws Exception;
/**
+ * Cancel the job that has the given job id.
+ *
+ * @param jobId
+ * the JobId of the Job
+ * @throws Exception
+ */
+ public void cancelJob(JobId jobId) throws Exception;
+
+ /**
* Start the specified Job.
*
* @param jobSpec
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index f7995d7..1afbe9e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -40,6 +40,8 @@
public JobId startJob(JobId jobId) throws Exception;
+ public void cancelJob(JobId jobId) throws Exception;
+
public JobId distributeJob(byte[] acggfBytes) throws Exception;
public JobId destroyJob(JobId jobId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 1b97a60..333b1df 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.api.exceptions;
-import java.io.File;
import java.io.InputStream;
import java.util.Map;
@@ -59,6 +58,8 @@
public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
public static final int DISTRIBUTED_JOB_FAILURE = 23;
public static final int NO_RESULTSET = 24;
+ public static final int JOB_CANCELED = 25;
+ public static final int NODE_FAILED = 26;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 404104d..6c581f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,6 +28,13 @@
*/
public class HyracksDataException extends HyracksException {
+ public static HyracksDataException create(Throwable cause) {
+ if (cause instanceof HyracksDataException) {
+ return (HyracksDataException) cause;
+ }
+ return new HyracksDataException(cause);
+ }
+
public static HyracksDataException create(int code, Serializable... params) {
return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 559468d..1f2c7a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -33,6 +33,13 @@
private final String nodeId;
private transient volatile String msgCache;
+ public static HyracksException create(Throwable cause) {
+ if (cause instanceof HyracksException) {
+ return (HyracksException) cause;
+ }
+ return new HyracksException(cause);
+ }
+
public static HyracksException create(int code, Serializable... params) {
return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 2ac392b..1c4f916 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -221,11 +221,11 @@
for (Future<Void> initializationTask : initializationTasks) {
initializationTask.get();
}
- } catch (Throwable th) {
+ } catch (Exception e) {
for (Future<Void> initializationTask : initializationTasks) {
initializationTask.cancel(true);
}
- throw new HyracksDataException(th);
+ throw new HyracksDataException(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 3bf5a9a..12601fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -43,5 +43,7 @@
22 = The distributed job %1$s already exists
23 = The distributed work failed for %1$s at %2$s
24 = No result set for job %1$s
+25 = Job %1$s has been cancelled by a user
+26 = Node %1$s failed
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 265d3ef..ced3d67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.control.cc.work.CancelJobWork;
import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
@@ -94,6 +95,12 @@
ccs.getWorkQueue()
.schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
break;
+ case CANCEL_JOB:
+ HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+ (HyracksClientInterfaceFunctions.CancelJobFunction) fn;
+ ccs.getWorkQueue().schedule(
+ new CancelJobWork(ccs.getJobManager(), cjf.getJobId(), new IPCResponder<Void>(handle, mid)));
+ break;
case START_JOB:
HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 8f7b0cb..084bd1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
@@ -83,6 +84,8 @@
private final Random random;
+ private boolean cancelled = false;
+
public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
boolean predistributed) {
this.ccs = ccs;
@@ -112,6 +115,19 @@
ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
}
+ public void cancelJob() throws HyracksException {
+ // If the job is already terminated or failed, do nothing here.
+ if (jobRun.getPendingStatus() != null) {
+ return;
+ }
+ // Sets the cancelled flag.
+ cancelled = true;
+ // Aborts on-ongoing task clusters.
+ abortOngoingTaskClusters(ta -> false, ta -> null);
+ // Aborts the whole job.
+ abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())));
+ }
+
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
throws HyracksException {
for (ActivityCluster root : roots) {
@@ -661,7 +677,7 @@
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
+ if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
abortJob(exceptions);
return;
}
@@ -691,42 +707,70 @@
jobManager.finalComplete(jobRun);
return;
}
- for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
- if (!isPlanned(ac)) {
- continue;
- }
- TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
- if (taskClusters == null) {
- continue;
- }
- for (TaskCluster tc : taskClusters) {
- TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
- if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt
- .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
- || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
- continue;
- }
- boolean abort = false;
- for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
- assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED
- || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING;
- if (deadNodes.contains(ta.getNodeId())) {
- ta.setStatus(TaskAttempt.TaskStatus.FAILED,
- Collections.singletonList(new Exception("Node " + ta.getNodeId() + " failed")));
- ta.setEndTime(System.currentTimeMillis());
- abort = true;
- }
- }
- if (abort) {
- abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED);
- }
- }
- abortDoomedTaskClusters();
- }
+ abortOngoingTaskClusters(ta -> deadNodes.contains(ta.getNodeId()),
+ ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
startRunnableActivityClusters();
} catch (Exception e) {
abortJob(Collections.singletonList(e));
}
}
+ private interface ITaskFilter {
+ boolean directlyMarkAsFailed(TaskAttempt ta);
+ }
+
+ private interface IExceptionGenerator {
+ HyracksException getException(TaskAttempt ta);
+ }
+
+ /**
+ * Aborts ongoing task clusters.
+ *
+ * @param taskFilter,
+ * selects tasks that should be directly marked as failed without doing the aborting RPC.
+ * @param exceptionGenerator,
+ * generates an exception for tasks that are directly marked as failed.
+ */
+ private void abortOngoingTaskClusters(ITaskFilter taskFilter, IExceptionGenerator exceptionGenerator)
+ throws HyracksException {
+ for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
+ if (!isPlanned(ac)) {
+ continue;
+ }
+ TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+ if (taskClusters == null) {
+ continue;
+ }
+ for (TaskCluster tc : taskClusters) {
+ TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+ if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt
+ .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+ || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+ continue;
+ }
+ boolean abort = false;
+ for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+ assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED
+ || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING;
+ if (taskFilter.directlyMarkAsFailed(ta)) {
+ // Directly mark it as fail, without further aborting.
+ ta.setStatus(TaskAttempt.TaskStatus.FAILED,
+ Collections.singletonList(exceptionGenerator.getException(ta)));
+ ta.setEndTime(System.currentTimeMillis());
+ abort = true;
+ }
+ }
+ if (abort) {
+ abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ }
+ }
+ abortDoomedTaskClusters();
+ }
+ }
+
+ // Returns whether the job has been cancelled.
+ private boolean isCancelled() {
+ return cancelled;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 180e850..21fc08f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -45,6 +45,14 @@
void add(JobRun jobRun) throws HyracksException;
/**
+ * Cancel a job with a given job id.
+ *
+ * @param jobId,
+ * the id of the job.
+ */
+ void cancel(JobId jobId) throws HyracksException;
+
+ /**
* This method is called when the master process decides to complete job.
* The implementation of this method should instruct all involved worker processes to clean the state of each
* individual parallel partition up.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 741e3db..031303b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -47,7 +47,6 @@
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.work.IResultCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -108,8 +107,7 @@
IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
switch (status) {
case QUEUE:
- jobRun.setStatus(JobStatus.PENDING, null);
- jobQueue.add(jobRun);
+ queueJob(jobRun);
break;
case EXECUTE:
executeJob(jobRun);
@@ -118,6 +116,32 @@
}
@Override
+ public void cancel(JobId jobId) throws HyracksException {
+ if (jobId == null) {
+ return;
+ }
+ // Cancels a running job.
+ if (activeRunMap.containsKey(jobId)) {
+ JobRun jobRun = activeRunMap.get(jobId);
+ // The following call will abort all ongoing tasks and then consequently
+ // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+ // Therefore, we do not remove the job out of activeRunMap here.
+ jobRun.getExecutor().cancelJob();
+ return;
+ }
+ // Removes a pending job.
+ JobRun jobRun = jobQueue.remove(jobId);
+ if (jobRun != null) {
+ List<Exception> exceptions = Collections
+ .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
+ // Since the job has not been executed, we only need to update its status and lifecyle here.
+ jobRun.setStatus(JobStatus.FAILURE, exceptions);
+ runMapArchive.put(jobId, jobRun);
+ runMapHistory.put(jobId, exceptions);
+ }
+ }
+
+ @Override
public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
checkJob(run);
if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
@@ -244,9 +268,12 @@
@Override
public JobRun get(JobId jobId) {
- JobRun jobRun = activeRunMap.get(jobId);
+ JobRun jobRun = activeRunMap.get(jobId); // Running job.
if (jobRun == null) {
- jobRun = runMapArchive.get(jobId);
+ jobRun = jobQueue.get(jobId); // Pending job.
+ }
+ if (jobRun == null) {
+ jobRun = runMapArchive.get(jobId); // Completed job.
}
return jobRun;
}
@@ -256,7 +283,7 @@
return runMapHistory.get(jobId);
}
- private void pickJobsToRun() {
+ private void pickJobsToRun() throws HyracksException {
List<JobRun> selectedRuns = jobQueue.pull();
for (JobRun run : selectedRuns) {
executeJob(run);
@@ -264,24 +291,24 @@
}
// Executes a job when the required capacity for the job is met.
- private void executeJob(JobRun run) {
- IResultCallback<JobId> callback = run.getCallback();
- try {
- run.setStartTime(System.currentTimeMillis());
- JobId jobId = run.getJobId();
- activeRunMap.put(jobId, run);
+ private void executeJob(JobRun run) throws HyracksException {
+ run.setStartTime(System.currentTimeMillis());
+ JobId jobId = run.getJobId();
+ activeRunMap.put(jobId, run);
- CCApplicationContext appCtx = ccs.getApplicationContext();
- JobSpecification spec = run.getJobSpecification();
- if (!run.getExecutor().isPredistributed()) {
- appCtx.notifyJobCreation(jobId, spec);
- }
- run.setStatus(JobStatus.RUNNING, null);
- executeJobInternal(run);
- callback.setValue(jobId);
- } catch (Exception e) {
- callback.setException(e);
+ CCApplicationContext appCtx = ccs.getApplicationContext();
+ JobSpecification spec = run.getJobSpecification();
+ if (!run.getExecutor().isPredistributed()) {
+ appCtx.notifyJobCreation(jobId, spec);
}
+ run.setStatus(JobStatus.RUNNING, null);
+ executeJobInternal(run);
+ }
+
+ // Queue a job when the required capacity for the job is not met.
+ private void queueJob(JobRun jobRun) throws HyracksException {
+ jobRun.setStatus(JobStatus.PENDING, null);
+ jobQueue.add(jobRun);
}
private void executeJobInternal(JobRun run) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 3aa9043..55a7a82 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -52,7 +52,6 @@
import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.utils.ExceptionUtils;
-import org.apache.hyracks.control.common.work.IResultCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -99,9 +98,7 @@
private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
- private final IResultCallback<JobId> callback;
-
- private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback,
+ private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
JobSpecification spec, ActivityClusterGraph acg) {
this.deploymentId = deploymentId;
this.jobId = jobId;
@@ -116,14 +113,13 @@
connectorPolicyMap = new HashMap<>();
operatorLocations = new HashMap<>();
createTime = System.currentTimeMillis();
- this.callback = callback;
}
//Run a Pre-distributed job by passing the JobId
- public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback,
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
PreDistributedJobDescriptor distributedJobDescriptor)
throws HyracksException {
- this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback,
+ this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
this.scheduler = new JobExecutor(ccs, this, constaints, true);
@@ -131,9 +127,8 @@
//Run a new job by creating an ActivityClusterGraph
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
- IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
- IResultCallback<JobId> callback) {
- this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize());
+ IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
+ this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize());
this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
}
@@ -196,10 +191,6 @@
return createTime;
}
- public IResultCallback<JobId> getCallback() {
- return callback;
- }
-
public long getStartTime() {
return startTime;
}
@@ -231,13 +222,7 @@
wait();
}
if (exceptions != null && !exceptions.isEmpty()) {
- StringBuilder buffer = new StringBuilder();
- buffer.append("Job failed on account of:\n");
- for (Exception e : exceptions) {
- buffer.append(e.getMessage()).append('\n');
- }
- HyracksException he;
- he = new HyracksException(buffer.toString(), exceptions.get(0));
+ HyracksException he = HyracksException.create(exceptions.get(0));
for (int i = 1; i < exceptions.size(); ++i) {
he.addSuppressed(exceptions.get(i));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 6cf75bb..0377692 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -23,13 +23,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -42,9 +44,9 @@
public class FIFOJobQueue implements IJobQueue {
private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName());
-
private static final int CAPACITY = 4096;
- private final List<JobRun> jobQueue = new LinkedList<>();
+
+ private final Map<JobId, JobRun> jobListMap = new LinkedHashMap<>();
private final IJobManager jobManager;
private final IJobCapacityController jobCapacityController;
@@ -55,17 +57,27 @@
@Override
public void add(JobRun run) throws HyracksException {
- int size = jobQueue.size();
+ int size = jobListMap.size();
if (size >= CAPACITY) {
- throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, new Integer(CAPACITY));
+ throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, CAPACITY);
}
- jobQueue.add(run);
+ jobListMap.put(run.getJobId(), run);
+ }
+
+ @Override
+ public JobRun remove(JobId jobId) {
+ return jobListMap.remove(jobId);
+ }
+
+ @Override
+ public JobRun get(JobId jobId) {
+ return jobListMap.get(jobId);
}
@Override
public List<JobRun> pull() {
List<JobRun> jobRuns = new ArrayList<>();
- Iterator<JobRun> runIterator = jobQueue.iterator();
+ Iterator<JobRun> runIterator = jobListMap.values().iterator();
while (runIterator.hasNext()) {
JobRun run = runIterator.next();
JobSpecification job = run.getJobSpecification();
@@ -89,7 +101,6 @@
} catch (HyracksException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
}
- continue;
}
}
return jobRuns;
@@ -97,7 +108,7 @@
@Override
public Collection<JobRun> jobs() {
- return Collections.unmodifiableCollection(jobQueue);
+ return Collections.unmodifiableCollection(jobListMap.values());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index 2c26799..e666224 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.job.JobRun;
/**
@@ -41,6 +42,22 @@
void add(JobRun run) throws HyracksException;
/**
+ * Removes a job with a given jobId from the job queue.
+ *
+ * @param jobId,
+ * the job id of the job to be removed.
+ */
+ JobRun remove(JobId jobId);
+
+ /**
+ * Retrieves a job with a given jobId from the job queue.
+ *
+ * @param jobId,
+ * the job id of the job to be retrieved.
+ */
+ JobRun get(JobId jobId);
+
+ /**
* Pull a list of jobs from the job queque, when more cluster capacity becomes available.
*
* @return a list of jobs whose capacity requirements can all be met at the same time.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
new file mode 100644
index 0000000..f3b67c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+/**
+ * This work cancels a job with the given job id.
+ * It is triggered by the cancel call with a job id from the client.
+ */
+public class CancelJobWork extends SynchronizableWork {
+ private final IJobManager jobManager;
+ private final JobId jobId;
+ private final IResultCallback<Void> callback;
+
+ public CancelJobWork(IJobManager jobManager, JobId jobId, IResultCallback<Void> callback) {
+ this.jobId = jobId;
+ this.jobManager = jobManager;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ if (jobId != null) {
+ jobManager.cancel(jobId);
+ }
+ callback.setValue(null);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index c608712..1253cf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -65,14 +65,14 @@
.deserialize(acggfBytes, deploymentId, appCtx);
IActivityClusterGraphGenerator acgg =
acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
- run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+ run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
//ActivityClusterGraph has already been distributed
- run = new JobRun(ccs, deploymentId, jobId, callback,
+ run = new JobRun(ccs, deploymentId, jobId,
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
}
jobManager.add(run);
-
+ callback.setValue(jobId);
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 88b8939..3bb08bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -199,6 +199,61 @@
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
}
+ @Test
+ public void testCancel() throws HyracksException {
+ CCConfig ccConfig = new CCConfig();
+ IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+ IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
+
+ // Submits runnable jobs.
+ List<JobRun> acceptedRuns = new ArrayList<>();
+ for (int id = 0; id < 4096; ++id) {
+ // Mocks an immediately executable job.
+ JobRun run = mockJobRun(id);
+ JobSpecification job = mock(JobSpecification.class);
+ when(run.getJobSpecification()).thenReturn(job);
+ when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+ // Submits the job.
+ acceptedRuns.add(run);
+ jobManager.add(run);
+ Assert.assertTrue(jobManager.getRunningJobs().size() == id + 1);
+ Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+ }
+
+ // Submits jobs that will be deferred due to the capacity limitation.
+ List<JobRun> deferredRuns = new ArrayList<>();
+ for (int id = 4096; id < 8192; ++id) {
+ // Mocks a deferred job.
+ JobRun run = mockJobRun(id);
+ JobSpecification job = mock(JobSpecification.class);
+ when(run.getJobSpecification()).thenReturn(job);
+ when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+ .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+ // Submits the job.
+ deferredRuns.add(run);
+ jobManager.add(run);
+ Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
+ Assert.assertTrue(jobManager.getPendingJobs().size() == id + 1 - 4096);
+ }
+
+ // Cancels deferred jobs.
+ for (JobRun run : deferredRuns) {
+ jobManager.cancel(run.getJobId());
+ }
+
+ // Cancels runnable jobs.
+ for (JobRun run : acceptedRuns) {
+ jobManager.cancel(run.getJobId());
+ }
+
+ Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ verify(jobManager, times(0)).prepareComplete(any(), any(), any());
+ verify(jobManager, times(0)).finalComplete(any());
+ }
+
private JobRun mockJobRun(long id) {
JobRun run = mock(JobRun.class, Mockito.RETURNS_DEEP_STUBS);
when(run.getExceptions()).thenReturn(Collections.emptyList());
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 4163e46..3d6ac00 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -35,9 +36,11 @@
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -46,12 +49,14 @@
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
public abstract class AbstractMultiNCIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
@@ -88,6 +93,7 @@
ccRoot.delete();
ccRoot.mkdir();
ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ ccConfig.appCCMainClass = DummyApplicationEntryPoint.class.getName();
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -122,6 +128,18 @@
cc.stop();
}
+ protected JobId startJob(JobSpecification spec) throws Exception {
+ return hcc.startJob(spec);
+ }
+
+ protected void waitForCompletion(JobId jobId) throws Exception {
+ hcc.waitForCompletion(jobId);
+ }
+
+ protected void cancelJob(JobId jobId) throws Exception {
+ hcc.cancelJob(jobId);
+ }
+
protected void runTest(JobSpecification spec) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(spec.toJSON().asText());
@@ -201,4 +219,40 @@
return tempFile;
}
+ public static class DummyApplicationEntryPoint implements ICCApplicationEntryPoint {
+
+ @Override
+ public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+
+ @Override
+ public void startupCompleted() throws Exception {
+
+ }
+
+ @Override
+ public IJobCapacityController getJobCapacityController() {
+ return new IJobCapacityController() {
+ private long maxRAM = Runtime.getRuntime().maxMemory();
+
+ @Override
+ public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+ return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
+ ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
+ }
+
+ @Override
+ public void release(JobSpecification job) {
+
+ }
+ };
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
new file mode 100644
index 0000000..7c3b66f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.integration;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.ManagedFileSplit;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CancelJobTest extends AbstractMultiNCIntegrationTest {
+
+ @Test
+ public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
+ //Cancels executing jobs after waitForCompletion() is called.
+ for (JobSpecification spec : testJobs()) {
+ cancelAfterWaitForCompletion(spec);
+ }
+ }
+
+ @Test
+ public void cancelExecutingJobBeforeWaitForCompletion() throws Exception {
+ //Cancels executing jobs before waitForCompletion is called.
+ for (JobSpecification spec : testJobs()) {
+ cancelBeforeWaitForCompletion(spec);
+ }
+ }
+
+ @Test
+ public void cancelExecutingJobWithoutWaitForCompletion() throws Exception {
+ //Cancels executing jobs without calling waitForCompletion.
+ for (JobSpecification spec : testJobs()) {
+ cancelWithoutWait(spec);
+ }
+ }
+
+ @Test
+ public void cancelPendingJobAfterWaitForCompletion() throws Exception {
+ //Cancels pending jobs after waitForCompletion() is called.
+ for (JobSpecification spec : testJobs()) {
+ setJobCapacity(spec);
+ cancelAfterWaitForCompletion(spec);
+ }
+ }
+
+ @Test
+ public void cancelPendingJobBeforeWaitForCompletion() throws Exception {
+ //Cancels pending jobs before waitForCompletion is called.
+ for (JobSpecification spec : testJobs()) {
+ setJobCapacity(spec);
+ cancelBeforeWaitForCompletion(spec);
+ }
+ }
+
+ @Test
+ public void cancelPendingJobWithoutWaitForCompletion() throws Exception {
+ //Cancels pending jobs without calling waitForCompletion.
+ for (JobSpecification spec : testJobs()) {
+ setJobCapacity(spec);
+ cancelWithoutWait(spec);
+ }
+ }
+
+ private JobSpecification[] testJobs() {
+ return new JobSpecification[] { jobWithSleepSourceOp(), jobWithSleepOp() };
+ }
+
+ private void setJobCapacity(JobSpecification spec) {
+ IClusterCapacity reqCapacity = new ClusterCapacity();
+ reqCapacity.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+ spec.setRequiredClusterCapacity(reqCapacity);
+ }
+
+ private void cancelAfterWaitForCompletion(JobSpecification spec) throws Exception {
+ JobId jobId = startJob(spec);
+ // A thread for canceling the job.
+ Thread thread = new Thread(() -> {
+ try {
+ synchronized (this) {
+ this.wait(500); // Make sure waitForCompletion be called first.
+ }
+ cancelJob(jobId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // Cancels the job.
+ thread.start();
+
+ // Checks the resulting Exception.
+ boolean exceptionMatched = false;
+ try {
+ waitForCompletion(jobId);
+ } catch (Exception e) {
+ exceptionMatched = true;
+ Assert.assertTrue(e instanceof HyracksException);
+ HyracksException hyracksException = (HyracksException) e;
+ Assert.assertTrue(hyracksException.getErrorCode() == ErrorCode.JOB_CANCELED);
+ } finally {
+ Assert.assertTrue(exceptionMatched);
+ }
+ thread.join();
+ }
+
+ private void cancelBeforeWaitForCompletion(JobSpecification spec) throws Exception {
+ boolean exceptionMatched = false;
+ try {
+ JobId jobId = startJob(spec);
+ cancelJob(jobId);
+ waitForCompletion(jobId);
+ } catch (HyracksException e) {
+ exceptionMatched = true;
+ Assert.assertTrue(e.getErrorCode() == ErrorCode.JOB_CANCELED);
+ } finally {
+ Assert.assertTrue(exceptionMatched);
+ }
+ }
+
+ private void cancelWithoutWait(JobSpecification spec) throws Exception {
+ JobId jobId = startJob(spec);
+ cancelJob(jobId);
+ }
+
+ private JobSpecification jobWithSleepSourceOp() {
+ JobSpecification spec = new JobSpecification();
+ SleepSourceOperatorDescriptor sourceOpDesc = new SleepSourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
+ SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
+ IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+ spec.addRoot(sinkOpDesc);
+ return spec;
+ }
+
+ private JobSpecification jobWithSleepOp() {
+ JobSpecification spec = new JobSpecification();
+ FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0],
+ "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl") };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor recordDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+ // File scan operator.
+ FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+ recordDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, ASTERIX_IDS[0]);
+
+ // Sleep operator.
+ SleepOperatorDescriptor sleepOp = new SleepOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sleepOp, ASTERIX_IDS);
+
+ // Sink operator.
+ SinkOperatorDescriptor sinkOp = new SinkOperatorDescriptor(spec, 1);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOp, ASTERIX_IDS);
+
+ // Hash-repartitioning connector.
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, scanOp, 0, sleepOp, 0);
+
+ // One-to-one connector.
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, sleepOp, 0, sinkOp, 0);
+ return spec;
+ }
+
+}
+
+class SleepSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SleepSourceOperatorDescriptor(JobSpecification spec) {
+ super(spec, 0, 1);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ while (true) {
+ synchronized (this) {
+ wait();
+ }
+ }
+ } catch (Exception e) {
+ writer.fail();
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
+
+class SleepOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SleepOperatorDescriptor(JobSpecification spec) {
+ super(spec, 1, 1);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ while (true) {
+ synchronized (this) {
+ wait();
+ }
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file