Renamed Job to Work
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@688 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 7925f0d..5e4121e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -95,7 +95,7 @@
private final Map<JobId, JobRun> runMap;
- private final WorkQueue jobQueue;
+ private final WorkQueue workQueue;
private final Executor taskExecutor;
@@ -117,7 +117,7 @@
taskExecutor = Executors.newCachedThreadPool();
webServer = new WebServer(this);
runMap = new HashMap<JobId, JobRun>();
- jobQueue = new WorkQueue();
+ workQueue = new WorkQueue();
this.timer = new Timer(true);
ccci = new CCClientInterface(this);
ccContext = new ICCContext() {
@@ -158,8 +158,8 @@
return runMap;
}
- public WorkQueue getJobQueue() {
- return jobQueue;
+ public WorkQueue getWorkQueue() {
+ return workQueue;
}
public Executor getExecutor() {
@@ -186,7 +186,7 @@
public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
JobId jobId = createJobId();
JobCreateWork jce = new JobCreateWork(this, jobId, appName, jobSpec, jobFlags);
- jobQueue.schedule(jce);
+ workQueue.schedule(jce);
jce.sync();
return jobId;
}
@@ -196,7 +196,7 @@
INodeController nodeController = reg.getNodeController();
String id = reg.getNodeId();
NodeControllerState state = new NodeControllerState(nodeController, reg);
- jobQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
+ workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
@@ -209,7 +209,7 @@
@Override
public void unregisterNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
- jobQueue.scheduleAndSync(new UnregisterNodeWork(this, id));
+ workQueue.scheduleAndSync(new UnregisterNodeWork(this, id));
LOGGER.log(Level.INFO, "Unregistered INodeController");
}
@@ -217,33 +217,33 @@
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
TaskCompleteWork sce = new TaskCompleteWork(this, jobId, taskId, nodeId);
- jobQueue.schedule(sce);
+ workQueue.schedule(sce);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception exception)
throws Exception {
TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, exception);
- jobQueue.schedule(tfe);
+ workQueue.schedule(tfe);
}
@Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
GetJobStatusWork gse = new GetJobStatusWork(this, jobId);
- jobQueue.scheduleAndSync(gse);
+ workQueue.scheduleAndSync(gse);
return gse.getStatus();
}
@Override
public void start(JobId jobId) throws Exception {
JobStartWork jse = new JobStartWork(this, jobId);
- jobQueue.schedule(jse);
+ workQueue.schedule(jse);
}
@Override
public void waitForCompletion(JobId jobId) throws Exception {
GetJobStatusConditionVariableWork e = new GetJobStatusConditionVariableWork(this, jobId);
- jobQueue.scheduleAndSync(e);
+ workQueue.scheduleAndSync(e);
IJobStatusConditionVariable var = e.getConditionVariable();
if (var != null) {
var.waitForCompletion();
@@ -252,12 +252,12 @@
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- jobQueue.schedule(new ReportProfilesWork(this, profiles));
+ workQueue.schedule(new ReportProfilesWork(this, profiles));
}
@Override
public synchronized void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- jobQueue.schedule(new NodeHeartbeatWork(this, id, hbData));
+ workQueue.schedule(new NodeHeartbeatWork(this, id, hbData));
}
@Override
@@ -274,14 +274,14 @@
@Override
public void destroyApplication(String appName) throws Exception {
FutureValue fv = new FutureValue();
- jobQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
+ workQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
fv.get();
}
@Override
public void startApplication(final String appName) throws Exception {
FutureValue fv = new FutureValue();
- jobQueue.schedule(new ApplicationStartWork(this, appName, fv));
+ workQueue.schedule(new ApplicationStartWork(this, appName, fv));
fv.get();
}
@@ -292,18 +292,18 @@
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
- jobQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
+ workQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) {
- jobQueue.schedule(new RegisterPartitionRequestWork(this, partitionRequest));
+ workQueue.schedule(new RegisterPartitionRequestWork(this, partitionRequest));
}
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
- jobQueue.schedule(new RemoveDeadNodesWork(ClusterControllerService.this));
+ workQueue.schedule(new RemoveDeadNodesWork(ClusterControllerService.this));
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 14489a6..e86741e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -189,7 +189,7 @@
+ inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
- ccs.getJobQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+ ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
return;
}
startRunnableTaskClusters(taskClusterRoots);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
index c10f5ce..73f3a1a0 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -68,7 +68,7 @@
}
OutputStreamGetter r = new OutputStreamGetter();
try {
- ccs.getJobQueue().scheduleAndSync(r);
+ ccs.getWorkQueue().scheduleAndSync(r);
} catch (Exception e) {
throw new IOException(e);
}
@@ -92,7 +92,7 @@
}
InputStreamGetter r = new InputStreamGetter();
try {
- ccs.getJobQueue().scheduleAndSync(r);
+ ccs.getWorkQueue().scheduleAndSync(r);
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
index 8141dbc..4135b18 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -41,7 +41,7 @@
}
case 0: {
GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs);
- ccs.getJobQueue().scheduleAndSync(gjse);
+ ccs.getWorkQueue().scheduleAndSync(gjse);
result.put("result", gjse.getSummaries());
break;
}
@@ -51,15 +51,15 @@
if ("job-specification".equalsIgnoreCase(arguments[1])) {
GetJobSpecificationJSONWork gjse = new GetJobSpecificationJSONWork(ccs, jobId);
- ccs.getJobQueue().scheduleAndSync(gjse);
+ ccs.getWorkQueue().scheduleAndSync(gjse);
result.put("result", gjse.getJSON());
} else if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
GetJobActivityGraphJSONWork gjage = new GetJobActivityGraphJSONWork(ccs, jobId);
- ccs.getJobQueue().scheduleAndSync(gjage);
+ ccs.getWorkQueue().scheduleAndSync(gjage);
result.put("result", gjage.getJSON());
} else if ("job-run".equalsIgnoreCase(arguments[1])) {
GetJobRunJSONWork gjre = new GetJobRunJSONWork(ccs, jobId);
- ccs.getJobQueue().scheduleAndSync(gjre);
+ ccs.getWorkQueue().scheduleAndSync(gjre);
result.put("result", gjre.getJSON());
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/NodesRESTAPIFunction.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/NodesRESTAPIFunction.java
index 3e5dd7a..b8742c8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/NodesRESTAPIFunction.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/NodesRESTAPIFunction.java
@@ -35,12 +35,12 @@
case 1: {
if ("".equals(arguments[0])) {
GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs);
- ccs.getJobQueue().scheduleAndSync(gnse);
+ ccs.getWorkQueue().scheduleAndSync(gnse);
result.put("result", gnse.getSummaries());
} else {
String nodeId = arguments[0];
GetNodeDetailsJSONWork gnde = new GetNodeDetailsJSONWork(ccs, nodeId);
- ccs.getJobQueue().scheduleAndSync(gnde);
+ ccs.getWorkQueue().scheduleAndSync(gnde);
result.put("result", gnde.getDetail());
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
index 94873d3..de55847 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
@@ -58,7 +58,7 @@
fv.setException(e);
return;
}
- ccs.getJobQueue().schedule(new AbstractWork() {
+ ccs.getWorkQueue().schedule(new AbstractWork() {
@Override
public void run() {
try {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 0d87b0d..1af4993 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -63,7 +63,7 @@
e.printStackTrace();
}
}
- ccs.getJobQueue().schedule(new AbstractWork() {
+ ccs.getWorkQueue().schedule(new AbstractWork() {
@Override
public void run() {
CCApplicationContext appCtx = ccs.getApplicationMap().get(
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index 0fa5710..d195cab 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -42,7 +42,7 @@
try {
run.getScheduler().startJob();
} catch (Exception e) {
- ccs.getJobQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+ ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 2fc172e..136cd5b 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -21,11 +21,11 @@
private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
private final LinkedBlockingQueue<AbstractWork> queue;
- private final JobThread thread;
+ private final WorkerThread thread;
public WorkQueue() {
queue = new LinkedBlockingQueue<AbstractWork>();
- thread = new JobThread();
+ thread = new WorkerThread();
thread.start();
}
@@ -41,8 +41,8 @@
sRunnable.sync();
}
- private class JobThread extends Thread {
- JobThread() {
+ private class WorkerThread extends Thread {
+ WorkerThread() {
setDaemon(true);
}