Fixed concurrent access to taskMap in Joblet

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1218 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index d2a7977..e8b282c 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -113,6 +113,13 @@
         taskMap.put(task.getTaskAttemptId(), task);
     }
 
+    public void removeTask(Task task) {
+        taskMap.remove(task.getTaskAttemptId());
+        if (cleanupPending && taskMap.isEmpty()) {
+            performCleanup();
+        }
+    }
+
     public Map<TaskAttemptId, Task> getTaskMap() {
         return taskMap;
     }
@@ -139,37 +146,11 @@
         }
     }
 
-    public synchronized void notifyTaskComplete(Task task) throws Exception {
-        taskMap.remove(task.getTaskAttemptId());
-        try {
-            TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
-            task.dumpProfile(taskProfile);
-            nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
-                    nodeController.getId(), taskProfile);
-        } finally {
-            if (cleanupPending && taskMap.isEmpty()) {
-                performCleanup();
-            }
-        }
-    }
-
-    public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
-        taskMap.remove(task.getTaskAttemptId());
-        try {
-            nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(),
-                    nodeController.getId(), details);
-        } finally {
-            if (cleanupPending && taskMap.isEmpty()) {
-                performCleanup();
-            }
-        }
-    }
-
     public NodeControllerService getNodeController() {
         return nodeController;
     }
 
-    public synchronized void dumpProfile(JobletProfile jProfile) {
+    public void dumpProfile(JobletProfile jProfile) {
         Map<String, Long> counters = jProfile.getCounters();
         for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
             counters.put(e.getKey(), e.getValue().get());
@@ -260,7 +241,7 @@
         this.jobletEventListener = jobletEventListener;
     }
 
-    public synchronized void cleanup(JobStatus status) {
+    public void cleanup(JobStatus status) {
         cleanupStatus = status;
         cleanupPending = true;
         if (taskMap.isEmpty()) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c3d3c34..f3d00ac 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -261,6 +261,10 @@
         return ncConfig;
     }
 
+    public WorkQueue getWorkQueue() {
+        return queue;
+    }
+
     private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
         String ipaddrStr = ncConfig.dataIPAddress;
         ipaddrStr = ipaddrStr.trim();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 8761c2f..1bb0b2f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -16,6 +16,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -51,6 +52,8 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork;
 
 public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     private final Joblet joblet;
@@ -165,6 +168,10 @@
         return this;
     }
 
+    public Joblet getJoblet() {
+        return joblet;
+    }
+
     public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
         return partitionSendProfile;
     }
@@ -263,7 +270,8 @@
             } finally {
                 operator.deinitialize();
             }
-            joblet.notifyTaskComplete(this);
+            NodeControllerService ncs = joblet.getNodeController();
+            ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
         } catch (Exception e) {
             failed = true;
             errorWriter.println("Exception caught by thread: " + ct.getName());
@@ -276,10 +284,11 @@
         }
         if (failed) {
             errorWriter.close();
+            NodeControllerService ncs = joblet.getNodeController();
             try {
-                joblet.notifyTaskFailed(this, errorBaos.toString("UTF-8"));
-            } catch (Exception e1) {
-                e1.printStackTrace();
+                ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
             }
         }
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
new file mode 100644
index 0000000..022c92f
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskCompleteWork extends AbstractWork {
+    private final NodeControllerService ncs;
+    private final Task task;
+
+    public NotifyTaskCompleteWork(NodeControllerService ncs, Task task) {
+        this.ncs = ncs;
+        this.task = task;
+    }
+
+    @Override
+    public void run() {
+        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+        task.dumpProfile(taskProfile);
+        try {
+            ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+                    ncs.getId(), taskProfile);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        task.getJoblet().removeTask(task);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
new file mode 100644
index 0000000..3957934
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskFailureWork extends AbstractWork {
+    private final NodeControllerService ncs;
+    private final Task task;
+    private final String details;
+
+    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+        this.ncs = ncs;
+        this.task = task;
+        this.details = details;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+                    ncs.getId(), details);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        task.getJoblet().removeTask(task);
+    }
+}
\ No newline at end of file