Added waiting to job cleanup for all joblets to terminate
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@845 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 7c1f790..bcc3c30 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -51,6 +51,7 @@
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
+import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
@@ -254,6 +255,12 @@
}
@Override
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+ JobletCleanupNotificationWork jcnw = new JobletCleanupNotificationWork(this, jobId, nodeId);
+ workQueue.schedule(jcnw);
+ }
+
+ @Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
GetJobStatusWork gse = new GetJobStatusWork(this, jobId);
workQueue.scheduleAndSync(gse);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 48a6e0d..df003b2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -47,6 +47,8 @@
private final Set<String> participatingNodeIds;
+ private final Set<String> cleanupPendingNodeIds;
+
private final JobProfile profile;
private Set<ActivityCluster> activityClusters;
@@ -67,11 +69,16 @@
private Exception exception;
+ private JobStatus pendingStatus;
+
+ private Exception pendingException;
+
public JobRun(JobId jobId, JobActivityGraph plan) {
this.jobId = jobId;
this.jag = plan;
pmm = new PartitionMatchMaker();
participatingNodeIds = new HashSet<String>();
+ cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
@@ -99,6 +106,23 @@
return status;
}
+ public synchronized Exception getException() {
+ return exception;
+ }
+
+ public void setPendingStatus(JobStatus status, Exception exception) {
+ this.pendingStatus = status;
+ this.pendingException = exception;
+ }
+
+ public JobStatus getPendingStatus() {
+ return pendingStatus;
+ }
+
+ public synchronized Exception getPendingException() {
+ return pendingException;
+ }
+
public long getCreateTime() {
return createTime;
}
@@ -123,10 +147,6 @@
this.endTime = endTime;
}
- public synchronized Exception getException() {
- return exception;
- }
-
@Override
public synchronized void waitForCompletion() throws Exception {
while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -141,6 +161,10 @@
return participatingNodeIds;
}
+ public Set<String> getCleanupPendingNodeIds() {
+ return cleanupPendingNodeIds;
+ }
+
public JobProfile getJobProfile() {
return profile;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 548c963..d0f9151 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -32,7 +32,7 @@
@Override
public Void execute(INodeController node) throws Exception {
- node.cleanUpJob(jobId, status);
+ node.cleanUpJoblet(jobId, status);
return null;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 006874b..321b6b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -16,16 +16,9 @@
import java.util.Set;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
@@ -48,14 +41,12 @@
public void run() {
final JobRun run = ccs.getActiveRunMap().get(jobId);
Set<String> targetNodes = run.getParticipatingNodeIds();
+ run.getCleanupPendingNodeIds().addAll(targetNodes);
+ run.setPendingStatus(status, exception);
final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
int i = 0;
for (String n : targetNodes) {
jcns[i++] = new JobCompleteNotifier(n, jobId, status);
- NodeControllerState ncs = ccs.getNodeMap().get(n);
- if (ncs != null) {
- ncs.getActiveJobIds().remove(jobId);
- }
}
ccs.getExecutor().execute(new Runnable() {
@Override
@@ -67,41 +58,6 @@
e.printStackTrace();
}
}
- ccs.getWorkQueue().schedule(new AbstractWork() {
- @Override
- public void run() {
- CCApplicationContext appCtx = ccs.getApplicationMap().get(
- run.getJobActivityGraph().getApplicationName());
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- run.setStatus(status, exception);
- ccs.getActiveRunMap().remove(jobId);
- ccs.getRunMapArchive().put(jobId, run);
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private JSONObject createJobLogObject(final JobRun run) {
- JSONObject jobLogObject = new JSONObject();
- try {
- JobActivityGraph jag = run.getJobActivityGraph();
- jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
- jobLogObject.put("job-activity-graph", jag.toJSON());
- jobLogObject.put("job-run", run.toJSON());
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
- return jobLogObject;
- }
- });
}
});
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
new file mode 100644
index 0000000..80ab5ec
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class JobletCleanupNotificationWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(JobletCleanupNotificationWork.class.getName());
+
+ private ClusterControllerService ccs;
+ private JobId jobId;
+ private String nodeId;
+
+ public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void run() {
+ final JobRun run = ccs.getActiveRunMap().get(jobId);
+ Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
+ if (!cleanupPendingNodes.remove(nodeId)) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(nodeId + " not in pending cleanup nodes set: " + cleanupPendingNodes + " for Job: "
+ + jobId);
+ }
+ return;
+ }
+ NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
+ if (ncs != null) {
+ ncs.getActiveJobIds().remove(jobId);
+ }
+ if (cleanupPendingNodes.isEmpty()) {
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingException());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+
+ private JSONObject createJobLogObject(final JobRun run) {
+ JSONObject jobLogObject = new JSONObject();
+ try {
+ JobActivityGraph jag = run.getJobActivityGraph();
+ jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
+ jobLogObject.put("job-activity-graph", jag.toJSON());
+ jobLogObject.put("job-run", run.toJSON());
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ return jobLogObject;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 15bee45..53003d0 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -37,6 +37,8 @@
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 313f92e..fdf49e9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -38,7 +38,7 @@
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
- public void cleanUpJob(JobId jobId, JobStatus status) throws Exception;
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
public void notifyRegistration(IClusterController ccs) throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 05590e4..1a98ebd 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -75,6 +76,10 @@
private IJobletEventListener jobletEventListener;
+ private JobStatus cleanupStatus;
+
+ private boolean cleanupPending;
+
public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
@@ -86,6 +91,7 @@
counterMap = new HashMap<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+ cleanupPending = false;
}
@Override
@@ -140,16 +146,28 @@
public synchronized void notifyTaskComplete(Task task) throws Exception {
taskMap.remove(task);
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
- task.dumpProfile(taskProfile);
- nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
- nodeController.getId(), taskProfile);
+ try {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+ task.dumpProfile(taskProfile);
+ nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
+ nodeController.getId(), taskProfile);
+ } finally {
+ if (cleanupPending && taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
}
public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
taskMap.remove(task);
- nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
- details);
+ try {
+ nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(),
+ nodeController.getId(), details);
+ } finally {
+ if (cleanupPending && taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
}
public NodeControllerService getNodeController() {
@@ -246,4 +264,22 @@
public void setJobletEventListener(IJobletEventListener jobletEventListener) {
this.jobletEventListener = jobletEventListener;
}
+
+ public synchronized void cleanup(JobStatus status) {
+ cleanupStatus = status;
+ cleanupPending = true;
+ if (taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
+ private void performCleanup() {
+ nodeController.getJobletMap().remove(jobId);
+ IJobletEventListener listener = getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish(cleanupStatus);
+ }
+ close();
+ cleanupPending = false;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f535f3e..3fd10a8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -69,7 +69,7 @@
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
-import edu.uci.ics.hyracks.control.nc.work.CleanupJobWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
@@ -242,8 +242,8 @@
}
@Override
- public void cleanUpJob(JobId jobId, JobStatus status) throws Exception {
- CleanupJobWork cjw = new CleanupJobWork(this, jobId, status);
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+ CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
queue.scheduleAndSync(cjw);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 4a4f3f8..68061f1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -19,7 +19,9 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Hashtable;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
@@ -69,6 +71,8 @@
private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+ private final Set<Thread> pendingThreads;
+
private IPartitionCollector[] collectors;
private IOperatorNodePushable operator;
@@ -92,6 +96,7 @@
opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
.getPartition());
partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+ pendingThreads = new LinkedHashSet<Thread>();
failed = false;
errorBaos = new ByteArrayOutputStream();
errorWriter = new PrintWriter(errorBaos, true);
@@ -181,17 +186,38 @@
joblet.getExecutor().execute(this);
}
- public void abort() {
+ public synchronized void abort() {
aborted = true;
for (IPartitionCollector c : collectors) {
c.abort();
}
+ for (Thread t : pendingThreads) {
+ t.interrupt();
+ }
+ }
+
+ private synchronized void addPendingThread(Thread t) {
+ pendingThreads.add(t);
+ }
+
+ private synchronized void removePendingThread(Thread t) {
+ pendingThreads.remove(t);
+ if (pendingThreads.isEmpty()) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForCompletion() throws InterruptedException {
+ while (!pendingThreads.isEmpty()) {
+ wait();
+ }
}
@Override
public void run() {
Thread ct = Thread.currentThread();
String threadName = ct.getName();
+ addPendingThread(ct);
try {
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
operator.initialize();
@@ -205,7 +231,11 @@
final int cIdx = i;
executor.execute(new Runnable() {
public void run() {
+ if (aborted) {
+ return;
+ }
Thread thread = Thread.currentThread();
+ addPendingThread(thread);
String oldName = thread.getName();
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
try {
@@ -220,6 +250,7 @@
} finally {
thread.setName(oldName);
sem.release();
+ removePendingThread(thread);
}
}
});
@@ -242,6 +273,7 @@
} finally {
ct.setName(threadName);
close();
+ removePendingThread(ct);
}
if (failed) {
errorWriter.close();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
similarity index 74%
rename from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
rename to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7fbc603..4e51142 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,15 +18,14 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-public class CleanupJobWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(CleanupJobWork.class.getName());
+public class CleanupJobletWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
private final NodeControllerService ncs;
@@ -34,7 +33,7 @@
private JobStatus status;
- public CleanupJobWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
+ public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
this.ncs = ncs;
this.jobId = jobId;
this.status = status;
@@ -45,15 +44,12 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
+ ncs.getPartitionManager().unregisterPartitions(jobId);
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- Joblet joblet = jobletMap.remove(jobId);
+ Joblet joblet = jobletMap.get(jobId);
if (joblet != null) {
- IJobletEventListener listener = joblet.getJobletEventListener();
- if (listener != null) {
- listener.jobletFinish(status);
- }
- ncs.getPartitionManager().unregisterPartitions(jobId);
- joblet.close();
+ joblet.cleanup(status);
}
+ ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
}
}
\ No newline at end of file