Added waiting to job cleanup for all joblets to terminate

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@845 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 7c1f790..bcc3c30 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
+import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
@@ -254,6 +255,12 @@
     }
 
     @Override
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+        JobletCleanupNotificationWork jcnw = new JobletCleanupNotificationWork(this, jobId, nodeId);
+        workQueue.schedule(jcnw);
+    }
+
+    @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
         GetJobStatusWork gse = new GetJobStatusWork(this, jobId);
         workQueue.scheduleAndSync(gse);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 48a6e0d..df003b2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -47,6 +47,8 @@
 
     private final Set<String> participatingNodeIds;
 
+    private final Set<String> cleanupPendingNodeIds;
+
     private final JobProfile profile;
 
     private Set<ActivityCluster> activityClusters;
@@ -67,11 +69,16 @@
 
     private Exception exception;
 
+    private JobStatus pendingStatus;
+
+    private Exception pendingException;
+
     public JobRun(JobId jobId, JobActivityGraph plan) {
         this.jobId = jobId;
         this.jag = plan;
         pmm = new PartitionMatchMaker();
         participatingNodeIds = new HashSet<String>();
+        cleanupPendingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
@@ -99,6 +106,23 @@
         return status;
     }
 
+    public synchronized Exception getException() {
+        return exception;
+    }
+
+    public void setPendingStatus(JobStatus status, Exception exception) {
+        this.pendingStatus = status;
+        this.pendingException = exception;
+    }
+
+    public JobStatus getPendingStatus() {
+        return pendingStatus;
+    }
+
+    public synchronized Exception getPendingException() {
+        return pendingException;
+    }
+
     public long getCreateTime() {
         return createTime;
     }
@@ -123,10 +147,6 @@
         this.endTime = endTime;
     }
 
-    public synchronized Exception getException() {
-        return exception;
-    }
-
     @Override
     public synchronized void waitForCompletion() throws Exception {
         while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -141,6 +161,10 @@
         return participatingNodeIds;
     }
 
+    public Set<String> getCleanupPendingNodeIds() {
+        return cleanupPendingNodeIds;
+    }
+
     public JobProfile getJobProfile() {
         return profile;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 548c963..d0f9151 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -32,7 +32,7 @@
 
     @Override
     public Void execute(INodeController node) throws Exception {
-        node.cleanUpJob(jobId, status);
+        node.cleanUpJoblet(jobId, status);
         return null;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 006874b..321b6b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -16,16 +16,9 @@
 
 import java.util.Set;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
@@ -48,14 +41,12 @@
     public void run() {
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         Set<String> targetNodes = run.getParticipatingNodeIds();
+        run.getCleanupPendingNodeIds().addAll(targetNodes);
+        run.setPendingStatus(status, exception);
         final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
         int i = 0;
         for (String n : targetNodes) {
             jcns[i++] = new JobCompleteNotifier(n, jobId, status);
-            NodeControllerState ncs = ccs.getNodeMap().get(n);
-            if (ncs != null) {
-                ncs.getActiveJobIds().remove(jobId);
-            }
         }
         ccs.getExecutor().execute(new Runnable() {
             @Override
@@ -67,41 +58,6 @@
                         e.printStackTrace();
                     }
                 }
-                ccs.getWorkQueue().schedule(new AbstractWork() {
-                    @Override
-                    public void run() {
-                        CCApplicationContext appCtx = ccs.getApplicationMap().get(
-                                run.getJobActivityGraph().getApplicationName());
-                        if (appCtx != null) {
-                            try {
-                                appCtx.notifyJobFinish(jobId);
-                            } catch (HyracksException e) {
-                                e.printStackTrace();
-                            }
-                        }
-                        run.setStatus(status, exception);
-                        ccs.getActiveRunMap().remove(jobId);
-                        ccs.getRunMapArchive().put(jobId, run);
-                        try {
-                            ccs.getJobLogFile().log(createJobLogObject(run));
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    private JSONObject createJobLogObject(final JobRun run) {
-                        JSONObject jobLogObject = new JSONObject();
-                        try {
-                            JobActivityGraph jag = run.getJobActivityGraph();
-                            jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
-                            jobLogObject.put("job-activity-graph", jag.toJSON());
-                            jobLogObject.put("job-run", run.toJSON());
-                        } catch (JSONException e) {
-                            throw new RuntimeException(e);
-                        }
-                        return jobLogObject;
-                    }
-                });
             }
         });
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
new file mode 100644
index 0000000..80ab5ec
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class JobletCleanupNotificationWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(JobletCleanupNotificationWork.class.getName());
+
+    private ClusterControllerService ccs;
+    private JobId jobId;
+    private String nodeId;
+
+    public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void run() {
+        final JobRun run = ccs.getActiveRunMap().get(jobId);
+        Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
+        if (!cleanupPendingNodes.remove(nodeId)) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning(nodeId + " not in pending cleanup nodes set: " + cleanupPendingNodes + " for Job: "
+                        + jobId);
+            }
+            return;
+        }
+        NodeControllerState ncs = ccs.getNodeMap().get(nodeId);
+        if (ncs != null) {
+            ncs.getActiveJobIds().remove(jobId);
+        }
+        if (cleanupPendingNodes.isEmpty()) {
+            CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+            if (appCtx != null) {
+                try {
+                    appCtx.notifyJobFinish(jobId);
+                } catch (HyracksException e) {
+                    e.printStackTrace();
+                }
+            }
+            run.setStatus(run.getPendingStatus(), run.getPendingException());
+            ccs.getActiveRunMap().remove(jobId);
+            ccs.getRunMapArchive().put(jobId, run);
+            try {
+                ccs.getJobLogFile().log(createJobLogObject(run));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+    }
+
+    private JSONObject createJobLogObject(final JobRun run) {
+        JSONObject jobLogObject = new JSONObject();
+        try {
+            JobActivityGraph jag = run.getJobActivityGraph();
+            jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
+            jobLogObject.put("job-activity-graph", jag.toJSON());
+            jobLogObject.put("job-run", run.toJSON());
+        } catch (JSONException e) {
+            throw new RuntimeException(e);
+        }
+        return jobLogObject;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 15bee45..53003d0 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -37,6 +37,8 @@
 
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
 
+    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 313f92e..fdf49e9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -38,7 +38,7 @@
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
-    public void cleanUpJob(JobId jobId, JobStatus status) throws Exception;
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
 
     public void notifyRegistration(IClusterController ccs) throws Exception;
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 05590e4..1a98ebd 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -75,6 +76,10 @@
 
     private IJobletEventListener jobletEventListener;
 
+    private JobStatus cleanupStatus;
+
+    private boolean cleanupPending;
+
     public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
@@ -86,6 +91,7 @@
         counterMap = new HashMap<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+        cleanupPending = false;
     }
 
     @Override
@@ -140,16 +146,28 @@
 
     public synchronized void notifyTaskComplete(Task task) throws Exception {
         taskMap.remove(task);
-        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
-        task.dumpProfile(taskProfile);
-        nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
-                nodeController.getId(), taskProfile);
+        try {
+            TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+            task.dumpProfile(taskProfile);
+            nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
+                    nodeController.getId(), taskProfile);
+        } finally {
+            if (cleanupPending && taskMap.isEmpty()) {
+                performCleanup();
+            }
+        }
     }
 
     public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
         taskMap.remove(task);
-        nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
-                details);
+        try {
+            nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(),
+                    nodeController.getId(), details);
+        } finally {
+            if (cleanupPending && taskMap.isEmpty()) {
+                performCleanup();
+            }
+        }
     }
 
     public NodeControllerService getNodeController() {
@@ -246,4 +264,22 @@
     public void setJobletEventListener(IJobletEventListener jobletEventListener) {
         this.jobletEventListener = jobletEventListener;
     }
+
+    public synchronized void cleanup(JobStatus status) {
+        cleanupStatus = status;
+        cleanupPending = true;
+        if (taskMap.isEmpty()) {
+            performCleanup();
+        }
+    }
+
+    private void performCleanup() {
+        nodeController.getJobletMap().remove(jobId);
+        IJobletEventListener listener = getJobletEventListener();
+        if (listener != null) {
+            listener.jobletFinish(cleanupStatus);
+        }
+        close();
+        cleanupPending = false;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f535f3e..3fd10a8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -69,7 +69,7 @@
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
 import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
-import edu.uci.ics.hyracks.control.nc.work.CleanupJobWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
 import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
 import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
@@ -242,8 +242,8 @@
     }
 
     @Override
-    public void cleanUpJob(JobId jobId, JobStatus status) throws Exception {
-        CleanupJobWork cjw = new CleanupJobWork(this, jobId, status);
+    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+        CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
         queue.scheduleAndSync(cjw);
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 4a4f3f8..68061f1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -19,7 +19,9 @@
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 
@@ -69,6 +71,8 @@
 
     private final Map<PartitionId, PartitionProfile> partitionSendProfile;
 
+    private final Set<Thread> pendingThreads;
+
     private IPartitionCollector[] collectors;
 
     private IOperatorNodePushable operator;
@@ -92,6 +96,7 @@
         opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
                 .getPartition());
         partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+        pendingThreads = new LinkedHashSet<Thread>();
         failed = false;
         errorBaos = new ByteArrayOutputStream();
         errorWriter = new PrintWriter(errorBaos, true);
@@ -181,17 +186,38 @@
         joblet.getExecutor().execute(this);
     }
 
-    public void abort() {
+    public synchronized void abort() {
         aborted = true;
         for (IPartitionCollector c : collectors) {
             c.abort();
         }
+        for (Thread t : pendingThreads) {
+            t.interrupt();
+        }
+    }
+
+    private synchronized void addPendingThread(Thread t) {
+        pendingThreads.add(t);
+    }
+
+    private synchronized void removePendingThread(Thread t) {
+        pendingThreads.remove(t);
+        if (pendingThreads.isEmpty()) {
+            notifyAll();
+        }
+    }
+
+    public synchronized void waitForCompletion() throws InterruptedException {
+        while (!pendingThreads.isEmpty()) {
+            wait();
+        }
     }
 
     @Override
     public void run() {
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
+        addPendingThread(ct);
         try {
             ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
             operator.initialize();
@@ -205,7 +231,11 @@
                         final int cIdx = i;
                         executor.execute(new Runnable() {
                             public void run() {
+                                if (aborted) {
+                                    return;
+                                }
                                 Thread thread = Thread.currentThread();
+                                addPendingThread(thread);
                                 String oldName = thread.getName();
                                 thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
                                 try {
@@ -220,6 +250,7 @@
                                 } finally {
                                     thread.setName(oldName);
                                     sem.release();
+                                    removePendingThread(thread);
                                 }
                             }
                         });
@@ -242,6 +273,7 @@
         } finally {
             ct.setName(threadName);
             close();
+            removePendingThread(ct);
         }
         if (failed) {
             errorWriter.close();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
similarity index 74%
rename from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
rename to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index 7fbc603..4e51142 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,15 +18,14 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 
-public class CleanupJobWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(CleanupJobWork.class.getName());
+public class CleanupJobletWork extends SynchronizableWork {
+    private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
 
     private final NodeControllerService ncs;
 
@@ -34,7 +33,7 @@
 
     private JobStatus status;
 
-    public CleanupJobWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
+    public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
         this.ncs = ncs;
         this.jobId = jobId;
         this.status = status;
@@ -45,15 +44,12 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
+        ncs.getPartitionManager().unregisterPartitions(jobId);
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        Joblet joblet = jobletMap.remove(jobId);
+        Joblet joblet = jobletMap.get(jobId);
         if (joblet != null) {
-            IJobletEventListener listener = joblet.getJobletEventListener();
-            if (listener != null) {
-                listener.jobletFinish(status);
-            }
-            ncs.getPartitionManager().unregisterPartitions(jobId);
-            joblet.close();
+            joblet.cleanup(status);
         }
+        ncs.getClusterController().notifyJobletCleanup(jobId, ncs.getId());
     }
 }
\ No newline at end of file