Fixed error propagation

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@839 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index cbfbfa3..eaa057b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -250,9 +250,9 @@
     }
 
     @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception exception)
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details)
             throws Exception {
-        TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, exception);
+        TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
         workQueue.schedule(tfe);
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 38ee720..48a6e0d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -302,18 +300,9 @@
                                 taskAttempt.put("task-attempt-id", ta.getTaskAttemptId());
                                 taskAttempt.put("status", ta.getStatus());
                                 taskAttempt.put("node-id", ta.getNodeId());
-                                Exception e = ta.getException();
-                                if (e != null) {
-                                    JSONObject ex = new JSONObject();
-                                    ex.put("exception-class", e.getClass().getName());
-                                    ex.put("exception-message", e.getMessage());
-                                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                                    PrintWriter pw = new PrintWriter(baos);
-                                    e.printStackTrace(pw);
-                                    pw.close();
-                                    ex.put("exception-stacktrace", new String(baos.toByteArray()));
-
-                                    taskAttempt.put("exception", ex);
+                                String failureDetails = ta.getFailureDetails();
+                                if (failureDetails != null) {
+                                    taskAttempt.put("failure-details", failureDetails);
                                 }
                                 taskAttempts.put(taskAttempt);
                             }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index 22f5587..43495db 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -35,7 +35,7 @@
 
     private TaskStatus status;
 
-    private Exception exception;
+    private String failureDetails;
 
     public TaskAttempt(TaskClusterAttempt tcAttempt, TaskAttemptId taskId, Task taskState) {
         this.tcAttempt = tcAttempt;
@@ -67,12 +67,12 @@
         return status;
     }
 
-    public Exception getException() {
-        return exception;
+    public String getFailureDetails() {
+        return failureDetails;
     }
 
-    public void setStatus(TaskStatus status, Exception exception) {
+    public void setStatus(TaskStatus status, String details) {
         this.status = status;
-        this.exception = exception;
+        this.failureDetails = details;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index c5df7bf..3feb5a6 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -597,10 +597,10 @@
      *            - Failed Task Attempt
      * @param ac
      *            - Activity Cluster that owns this Task
-     * @param exception
+     * @param details
      *            - Cause of the failure
      */
-    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, Exception exception) {
+    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
         try {
             LOGGER.info("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
@@ -608,12 +608,12 @@
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
                 LOGGER.info("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
-                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
+                ta.setStatus(TaskAttempt.TaskStatus.FAILED, details);
                 abortTaskCluster(lastAttempt);
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
                 if (lastAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
-                    abortJob(null);
+                    abortJob(new HyracksException(details));
                     return;
                 }
                 startRunnableActivityClusters();
@@ -647,8 +647,7 @@
                             for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts()) {
                                 assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
                                 if (deadNodes.contains(ta.getNodeId())) {
-                                    ta.setStatus(TaskAttempt.TaskStatus.FAILED,
-                                            new HyracksException("Node " + ta.getNodeId() + " failed"));
+                                    ta.setStatus(TaskAttempt.TaskStatus.FAILED, "Node " + ta.getNodeId() + " failed");
                                     abort = true;
                                 }
                             }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 31f0872..b80d928 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -21,18 +21,18 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
-    private final Exception exception;
+    private final String details;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
-            Exception exception) {
+            String details) {
         super(ccs, jobId, taId, nodeId);
-        this.exception = exception;
+        this.details = details;
     }
 
     @Override
     protected void performEvent(TaskAttempt ta) {
         ActivityCluster ac = ta.getTaskState().getTaskCluster().getActivityCluster();
-        ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, exception);
+        ac.getJobRun().getScheduler().notifyTaskFailure(ta, ac, details);
     }
 
     @Override
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index ba0bc86..15bee45 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -35,7 +35,7 @@
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception;
 
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, Exception e) throws Exception;
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
 
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 7d39bfd..05590e4 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -146,10 +146,10 @@
                 nodeController.getId(), taskProfile);
     }
 
-    public synchronized void notifyTaskFailed(Task task, Exception exception) throws Exception {
+    public synchronized void notifyTaskFailed(Task task, String details) throws Exception {
         taskMap.remove(task);
         nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
-                exception);
+                details);
     }
 
     public NodeControllerService getNodeController() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index aaaaa21..4a4f3f8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.nc;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -71,6 +73,12 @@
 
     private IOperatorNodePushable operator;
 
+    private volatile boolean failed;
+
+    private ByteArrayOutputStream errorBaos;
+
+    private PrintWriter errorWriter;
+
     private volatile boolean aborted;
 
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
@@ -84,6 +92,9 @@
         opEnv = joblet.getEnvironment(taskId.getTaskId().getActivityId().getOperatorDescriptorId(), taskId.getTaskId()
                 .getPartition());
         partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+        failed = false;
+        errorBaos = new ByteArrayOutputStream();
+        errorWriter = new PrintWriter(errorBaos, true);
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -182,7 +193,7 @@
         Thread ct = Thread.currentThread();
         String threadName = ct.getName();
         try {
-            ct.setName(displayName + ": " + taskAttemptId);
+            ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
             operator.initialize();
             try {
                 if (collectors.length > 0) {
@@ -191,12 +202,23 @@
                         final IPartitionCollector collector = collectors[i];
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
                         sem.acquire();
+                        final int cIdx = i;
                         executor.execute(new Runnable() {
                             public void run() {
+                                Thread thread = Thread.currentThread();
+                                String oldName = thread.getName();
+                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
                                 try {
                                     pushFrames(collector, writer);
                                 } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        failed = true;
+                                        errorWriter.println("Exception caught by thread: " + thread.getName());
+                                        e.printStackTrace(errorWriter);
+                                        errorWriter.println();
+                                    }
                                 } finally {
+                                    thread.setName(oldName);
                                     sem.release();
                                 }
                             }
@@ -213,16 +235,22 @@
             }
             joblet.notifyTaskComplete(this);
         } catch (Exception e) {
-            e.printStackTrace();
-            try {
-                joblet.notifyTaskFailed(this, e);
-            } catch (Exception e1) {
-                e1.printStackTrace();
-            }
+            failed = true;
+            errorWriter.println("Exception caught by thread: " + ct.getName());
+            e.printStackTrace(errorWriter);
+            errorWriter.println();
         } finally {
             ct.setName(threadName);
             close();
         }
+        if (failed) {
+            errorWriter.close();
+            try {
+                joblet.notifyTaskFailed(this, errorBaos.toString("UTF-8"));
+            } catch (Exception e1) {
+                e1.printStackTrace();
+            }
+        }
     }
 
     private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {