Merge branch 'master' into vinayakb/error_reporting

Merging latest master
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 9b8a996..5976760 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -459,7 +459,7 @@
                 case NOTIFY_TASK_FAILURE: {
                     CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
                     workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
-                            .getTaskId(), ntff.getDetails(), ntff.getDetails()));
+                            .getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 827a71e..3a6dc26 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -78,11 +78,11 @@
 
     private JobStatus status;
 
-    private Exception exception;
+    private List<Exception> exceptions;
 
     private JobStatus pendingStatus;
 
-    private Exception pendingException;
+    private List<Exception> pendingExceptions;
 
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
@@ -124,9 +124,9 @@
         return pmm;
     }
 
-    public synchronized void setStatus(JobStatus status, Exception exception) {
+    public synchronized void setStatus(JobStatus status, List<Exception> exceptions) {
         this.status = status;
-        this.exception = exception;
+        this.exceptions = exceptions;
         notifyAll();
     }
 
@@ -134,21 +134,21 @@
         return status;
     }
 
-    public synchronized Exception getException() {
-        return exception;
+    public synchronized List<Exception> getExceptions() {
+        return exceptions;
     }
 
-    public void setPendingStatus(JobStatus status, Exception exception) {
+    public void setPendingStatus(JobStatus status, List<Exception> exceptions) {
         this.pendingStatus = status;
-        this.pendingException = exception;
+        this.pendingExceptions = exceptions;
     }
 
     public JobStatus getPendingStatus() {
         return pendingStatus;
     }
 
-    public synchronized Exception getPendingException() {
-        return pendingException;
+    public synchronized List<Exception> getPendingExceptions() {
+        return pendingExceptions;
     }
 
     public long getCreateTime() {
@@ -180,8 +180,18 @@
         while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
             wait();
         }
-        if (exception != null) {
-            throw new HyracksException("Job Failed", exception);
+        if (exceptions != null && !exceptions.isEmpty()) {
+            StringBuilder buffer = new StringBuilder();
+            buffer.append("Job failed on account of:\n");
+            for (Exception e : exceptions) {
+                buffer.append(e.getMessage()).append('\n');
+            }
+            HyracksException he;
+            he = new HyracksException(buffer.toString(), exceptions.get(0));
+            for (int i = 1; i < exceptions.size(); ++i) {
+                he.addSuppressed(exceptions.get(i));
+            }
+            throw he;
         }
     }
 
@@ -333,9 +343,9 @@
                                 taskAttempt.put("node-id", ta.getNodeId());
                                 taskAttempt.put("start-time", ta.getStartTime());
                                 taskAttempt.put("end-time", ta.getEndTime());
-                                String failureDetails = ta.getFailureDetails();
-                                if (failureDetails != null) {
-                                    taskAttempt.put("failure-details", failureDetails);
+                                List<Exception> exceptions = ta.getExceptions();
+                                if (exceptions != null && !exceptions.isEmpty()) {
+                                    taskAttempt.put("failure-details", exceptions);
                                 }
                                 taskAttempts.put(taskAttempt);
                             }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
index 7c0dd57..b323501 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskAttempt.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.job;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 
 public class TaskAttempt {
@@ -35,7 +37,7 @@
 
     private TaskStatus status;
 
-    private String failureDetails;
+    private List<Exception> exceptions;
 
     private long startTime;
 
@@ -73,13 +75,13 @@
         return status;
     }
 
-    public String getFailureDetails() {
-        return failureDetails;
+    public List<Exception> getExceptions() {
+        return exceptions;
     }
 
-    public void setStatus(TaskStatus status, String details) {
+    public void setStatus(TaskStatus status, List<Exception> exceptions) {
         this.status = status;
-        this.failureDetails = details;
+        this.exceptions = exceptions;
     }
 
     public long getStartTime() {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index f3d7d34..d09b641 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -454,13 +455,13 @@
         }
     }
 
-    private void abortJob(Exception exception) {
+    private void abortJob(List<Exception> exceptions) {
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
             abortTaskCluster(findLastTaskClusterAttempt(tc));
         }
         assert inProgressTaskClusters.isEmpty();
-        ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exception));
+        ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions));
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
@@ -598,7 +599,7 @@
      * @param details
      *            - Cause of the failure
      */
-    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
+    public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, List<Exception> exceptions) {
         try {
             LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
@@ -606,13 +607,13 @@
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
                 LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
-                ta.setStatus(TaskAttempt.TaskStatus.FAILED, details);
+                ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
                 abortTaskCluster(lastAttempt);
                 lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
                 lastAttempt.setEndTime(System.currentTimeMillis());
                 abortDoomedTaskClusters();
                 if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
-                    abortJob(new HyracksException(details));
+                    abortJob(exceptions);
                     return;
                 }
                 startRunnableActivityClusters();
@@ -621,7 +622,7 @@
                         + lastAttempt);
             }
         } catch (Exception e) {
-            abortJob(e);
+            abortJob(Collections.singletonList(e));
         }
     }
 
@@ -646,7 +647,10 @@
                             for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
                                 assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING);
                                 if (deadNodes.contains(ta.getNodeId())) {
-                                    ta.setStatus(TaskAttempt.TaskStatus.FAILED, "Node " + ta.getNodeId() + " failed");
+                                    ta.setStatus(
+                                            TaskAttempt.TaskStatus.FAILED,
+                                            Collections.singletonList(new Exception("Node " + ta.getNodeId()
+                                                    + " failed")));
                                     ta.setEndTime(System.currentTimeMillis());
                                     abort = true;
                                 }
@@ -661,7 +665,7 @@
             }
             startRunnableActivityClusters();
         } catch (Exception e) {
-            abortJob(e);
+            abortJob(Collections.singletonList(e));
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index b304b21..7954c7c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.List;
 import java.util.Set;
 import java.util.logging.Logger;
 
@@ -36,13 +37,13 @@
     private ClusterControllerService ccs;
     private JobId jobId;
     private JobStatus status;
-    private Exception exception;
+    private List<Exception> exceptions;
 
-    public JobCleanupWork(ClusterControllerService ccs, JobId jobId, JobStatus status, Exception exception) {
+    public JobCleanupWork(ClusterControllerService ccs, JobId jobId, JobStatus status, List<Exception> exceptions) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.status = status;
-        this.exception = exception;
+        this.exceptions = exceptions;
     }
 
     @Override
@@ -58,7 +59,7 @@
         }
         Set<String> targetNodes = run.getParticipatingNodeIds();
         run.getCleanupPendingNodeIds().addAll(targetNodes);
-        run.setPendingStatus(status, exception);
+        run.setPendingStatus(status, exceptions);
         if (targetNodes != null && !targetNodes.isEmpty()) {
             for (String n : targetNodes) {
                 NodeControllerState ncs = ccs.getNodeMap().get(n);
@@ -77,7 +78,7 @@
                     e.printStackTrace();
                 }
             }
-            run.setStatus(run.getPendingStatus(), run.getPendingException());
+            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
             try {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index 7ecdd16..0e739ba 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.Collections;
 import java.util.EnumSet;
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -62,7 +63,8 @@
             try {
                 run.getScheduler().startJob();
             } catch (Exception e) {
-                ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+                ccs.getWorkQueue().schedule(
+                        new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, Collections.singletonList(e)));
             }
             callback.setValue(jobId);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index ed58c43..65e1519 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -67,7 +67,7 @@
                     e.printStackTrace();
                 }
             }
-            run.setStatus(run.getPendingStatus(), run.getPendingException());
+            run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
             try {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index bc8c314..51eb671 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -22,11 +24,12 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
-    private final String details;
+    private final List<Exception> exceptions;
 
-    public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, String details) {
+    public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
+            List<Exception> exceptions) {
         super(ccs, jobId, taId, nodeId);
-        this.details = details;
+        this.exceptions = exceptions;
     }
 
     @Override
@@ -34,7 +37,7 @@
         JobRun run = ccs.getActiveRunMap().get(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
-        run.getScheduler().notifyTaskFailure(ta, ac, details);
+        run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 47a5c09..9954dc3 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -37,7 +37,8 @@
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception;
 
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+            throws Exception;
 
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e343657..a6382de 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -213,13 +213,13 @@
         private final JobId jobId;
         private final TaskAttemptId taskId;
         private final String nodeId;
-        private final String details;
+        private final List<Exception> exceptions;
 
-        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) {
             this.jobId = jobId;
             this.taskId = taskId;
             this.nodeId = nodeId;
-            this.details = details;
+            this.exceptions = exceptions;
         }
 
         @Override
@@ -239,8 +239,8 @@
             return nodeId;
         }
 
-        public String getDetails() {
-            return details;
+        public List<Exception> getExceptions() {
+            return exceptions;
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 5ed65cc..240322c 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -59,9 +59,9 @@
     }
 
     @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception {
         CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
-                details);
+                exceptions);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index ac76c16..206bf32 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -14,13 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.nc;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -81,11 +80,7 @@
 
     private IOperatorNodePushable operator;
 
-    private volatile boolean failed;
-
-    private ByteArrayOutputStream errorBaos;
-
-    private PrintWriter errorWriter;
+    private final List<Exception> exceptions;
 
     private volatile boolean aborted;
 
@@ -102,9 +97,7 @@
         opEnv = joblet.getEnvironment();
         partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
         pendingThreads = new LinkedHashSet<Thread>();
-        failed = false;
-        errorBaos = new ByteArrayOutputStream();
-        errorWriter = new PrintWriter(errorBaos, true);
+        exceptions = new ArrayList<>();
         this.ncs = ncs;
     }
 
@@ -252,10 +245,7 @@
                                     pushFrames(collector, writer);
                                 } catch (HyracksDataException e) {
                                     synchronized (Task.this) {
-                                        failed = true;
-                                        errorWriter.println("Exception caught by thread: " + thread.getName());
-                                        e.printStackTrace(errorWriter);
-                                        errorWriter.println();
+                                        exceptions.add(e);
                                     }
                                 } finally {
                                     thread.setName(oldName);
@@ -277,23 +267,15 @@
             NodeControllerService ncs = joblet.getNodeController();
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
         } catch (Exception e) {
-            failed = true;
-            errorWriter.println("Exception caught by thread: " + ct.getName());
-            e.printStackTrace(errorWriter);
-            errorWriter.println();
+            exceptions.add(e);
         } finally {
             ct.setName(threadName);
             close();
             removePendingThread(ct);
         }
-        if (failed) {
-            errorWriter.close();
+        if (!exceptions.isEmpty()) {
             NodeControllerService ncs = joblet.getNodeController();
-            try {
-                ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
-            } catch (UnsupportedEncodingException e) {
-                e.printStackTrace();
-            }
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions));
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 013544d..c70a1bd 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -23,12 +25,12 @@
 public class NotifyTaskFailureWork extends AbstractWork {
     private final NodeControllerService ncs;
     private final Task task;
-    private final String details;
+    private final List<Exception> exceptions;
 
-    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) {
         this.ncs = ncs;
         this.task = task;
-        this.details = details;
+        this.exceptions = exceptions;
     }
 
     @Override
@@ -39,7 +41,7 @@
             if (dpm != null) {
                 dpm.abortReader(jobId);
             }
-            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
+            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), exceptions);
         } catch (Exception e) {
             e.printStackTrace();
         }