Fixed error propagation
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@839 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index cbfbfa3..eaa057b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -250,9 +250,9 @@
}
@Override
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception exception)
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details)
throws Exception {
- TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, exception);
+ TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
workQueue.schedule(tfe);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 38ee720..48a6e0d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -302,18 +300,9 @@
taskAttempt.put("task-attempt-id", ta.getTaskAttemptId());
taskAttempt.put("status", ta.getStatus());
taskAttempt.put("node-id", ta.getNodeId());
- Exception e = ta.getException();
- if (e != null) {
- JSONObject ex = new JSONObject();
- ex.put("exception-class", e.getClass().getName());
- ex.put("exception-message", e.getMessage());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintWriter pw = new PrintWriter(baos);
- e.printStackTrace(pw);
- pw.close();
- ex.put("exception-stacktrace", new String(baos.toByteArray()));
-
- taskAttempt.put("exception", ex);
+ String failureDetails = ta.getFailureDetails();
+ if (failureDetails != null) {
+ taskAttempt.put("failure-details", failureDetails);
}
taskAttempts.put(taskAttempt);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index 22f5587..43495db 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -35,7 +35,7 @@
private TaskStatus status;
- private Exception exception;
+ private String failureDetails;
public TaskAttempt(TaskClusterAttempt tcAttempt, TaskAttemptId taskId, Task taskState) {
this.tcAttempt = tcAttempt;
@@ -67,12 +67,12 @@
return status;
}
- public Exception getException() {
- return exception;
+ public String getFailureDetails() {
+ return failureDetails;
}
- public void setStatus(TaskStatus status, Exception exception) {
+ public void setStatus(TaskStatus status, String details) {
this.status = status;
- this.exception = exception;
+ this.failureDetails = details;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index c5df7bf..3feb5a6 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -597,10 +597,10 @@
* - Failed Task Attempt
* @param ac
* - Activity Cluster that owns this Task
- * @param exception
+ * @param details
* - Cause of the failure
*/
- public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, Exception exception) {
+ public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
try {
LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
@@ -608,12 +608,12 @@
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
- ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
+ ta.setStatus(TaskAttempt.TaskStatus.FAILED, details);
abortTaskCluster(lastAttempt);
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
- abortJob(null);
+ abortJob(new HyracksException(details));
return;
}
startRunnableActivityClusters();
@@ -647,8 +647,7 @@
for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
if (deadNodes.contains(ta.getNodeId())) {
- ta.setStatus(TaskAttempt.TaskStatus.FAILED,
- new HyracksException("Node " + ta.getNodeId() + " failed"));
+ ta.setStatus(TaskAttempt.TaskStatus.FAILED, "Node " + ta.getNodeId() + " failed");
abort = true;
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 31f0872..b80d928 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -21,18 +21,18 @@
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
- private final Exception exception;
+ private final String details;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
- Exception exception) {
+ String details) {
super(ccs, jobId, taId, nodeId);
- this.exception = exception;
+ this.details = details;
}
@Override
protected void performEvent(TaskAttempt ta) {
ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
- ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, exception);
+ ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, details);
}
@Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index ba0bc86..15bee45 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -35,7 +35,7 @@
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception e) throws Exception;
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 7d39bfd..05590e4 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -146,10 +146,10 @@
nodeController.getId(), taskProfile);
}
- public synchronized void notifyTaskFailed(Task task, Exception exception) throws Exception {
+ public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
taskMap.remove(task);
nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
- exception);
+ details);
}
public NodeControllerService getNodeController() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index aaaaa21..4a4f3f8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.nc;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Hashtable;
@@ -71,6 +73,12 @@
private IOperatorNodePushable operator;
+ private volatile boolean failed;
+
+ private ByteArrayOutputStream errorBaos;
+
+ private PrintWriter errorWriter;
+
private volatile boolean aborted;
public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
@@ -84,6 +92,9 @@
opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
.getPartition());
partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+ failed = false;
+ errorBaos = new ByteArrayOutputStream();
+ errorWriter = new PrintWriter(errorBaos, true);
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -182,7 +193,7 @@
Thread ct = Thread.currentThread();
String threadName = ct.getName();
try {
- ct.setName(displayName + ": " + taskAttemptId);
+ ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
operator.initialize();
try {
if (collectors.length > 0) {
@@ -191,12 +202,23 @@
final IPartitionCollector collector = collectors[i];
final IFrameWriter writer = operator.getInputFrameWriter(i);
sem.acquire();
+ final int cIdx = i;
executor.execute(new Runnable() {
public void run() {
+ Thread thread = Thread.currentThread();
+ String oldName = thread.getName();
+ thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
try {
pushFrames(collector, writer);
} catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + thread.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
+ }
} finally {
+ thread.setName(oldName);
sem.release();
}
}
@@ -213,16 +235,22 @@
}
joblet.notifyTaskComplete(this);
} catch (Exception e) {
- e.printStackTrace();
- try {
- joblet.notifyTaskFailed(this, e);
- } catch (Exception e1) {
- e1.printStackTrace();
- }
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + ct.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
} finally {
ct.setName(threadName);
close();
}
+ if (failed) {
+ errorWriter.close();
+ try {
+ joblet.notifyTaskFailed(this, errorBaos.toString("UTF-8"));
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
+ }
}
private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {