Fixed concurrent access to taskMap in Joblet
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1218 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index d2a7977..e8b282c 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -113,6 +113,13 @@
taskMap.put(task.getTaskAttemptId(), task);
}
+ public void removeTask(Task task) {
+ taskMap.remove(task.getTaskAttemptId());
+ if (cleanupPending && taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
public Map<TaskAttemptId, Task> getTaskMap() {
return taskMap;
}
@@ -139,37 +146,11 @@
}
}
- public synchronized void notifyTaskComplete(Task task) throws Exception {
- taskMap.remove(task.getTaskAttemptId());
- try {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
- task.dumpProfile(taskProfile);
- nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
- nodeController.getId(), taskProfile);
- } finally {
- if (cleanupPending && taskMap.isEmpty()) {
- performCleanup();
- }
- }
- }
-
- public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
- taskMap.remove(task.getTaskAttemptId());
- try {
- nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(),
- nodeController.getId(), details);
- } finally {
- if (cleanupPending && taskMap.isEmpty()) {
- performCleanup();
- }
- }
- }
-
public NodeControllerService getNodeController() {
return nodeController;
}
- public synchronized void dumpProfile(JobletProfile jProfile) {
+ public void dumpProfile(JobletProfile jProfile) {
Map<String, Long> counters = jProfile.getCounters();
for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
counters.put(e.getKey(), e.getValue().get());
@@ -260,7 +241,7 @@
this.jobletEventListener = jobletEventListener;
}
- public synchronized void cleanup(JobStatus status) {
+ public void cleanup(JobStatus status) {
cleanupStatus = status;
cleanupPending = true;
if (taskMap.isEmpty()) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c3d3c34..f3d00ac 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -261,6 +261,10 @@
return ncConfig;
}
+ public WorkQueue getWorkQueue() {
+ return queue;
+ }
+
private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
String ipaddrStr = ncConfig.dataIPAddress;
ipaddrStr = ipaddrStr.trim();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 8761c2f..1bb0b2f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -16,6 +16,7 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Hashtable;
@@ -51,6 +52,8 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork;
public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
private final Joblet joblet;
@@ -165,6 +168,10 @@
return this;
}
+ public Joblet getJoblet() {
+ return joblet;
+ }
+
public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
return partitionSendProfile;
}
@@ -263,7 +270,8 @@
} finally {
operator.deinitialize();
}
- joblet.notifyTaskComplete(this);
+ NodeControllerService ncs = joblet.getNodeController();
+ ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
} catch (Exception e) {
failed = true;
errorWriter.println("Exception caught by thread: " + ct.getName());
@@ -276,10 +284,11 @@
}
if (failed) {
errorWriter.close();
+ NodeControllerService ncs = joblet.getNodeController();
try {
- joblet.notifyTaskFailed(this, errorBaos.toString("UTF-8"));
- } catch (Exception e1) {
- e1.printStackTrace();
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
}
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
new file mode 100644
index 0000000..022c92f
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskCompleteWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+
+ public NotifyTaskCompleteWork(NodeControllerService ncs, Task task) {
+ this.ncs = ncs;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+ task.dumpProfile(taskProfile);
+ try {
+ ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), taskProfile);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
new file mode 100644
index 0000000..3957934
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskFailureWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+ private final String details;
+
+ public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+ this.ncs = ncs;
+ this.task = task;
+ this.details = details;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), details);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file