Propagate the actual exceptions thrown in the NCs to the CC and to the client via the API or WebUI.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index dc99ef3..c3bf77a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.dataset;
import java.util.HashMap;
+import java.util.List;
public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
public enum Status {
@@ -27,6 +28,8 @@
private Status status;
+ private List<Throwable> caughtExceptions;
+
public DatasetJobRecord() {
this.status = Status.RUNNING;
}
@@ -43,7 +46,16 @@
status = Status.FAILED;
}
+ public void fail(List<Throwable> caughtExceptions) {
+ status = Status.FAILED;
+ this.caughtExceptions = caughtExceptions;
+ }
+
public Status getStatus() {
return status;
}
+
+ public List<Throwable> getCaughtExceptions() {
+ return caughtExceptions;
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 52e6005..ccb18b3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.api.dataset;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -28,7 +30,7 @@
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
- public void reportJobFailure(JobId jobId);
+ public void reportJobFailure(JobId jobId, List<Throwable> caughtExceptions);
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 6419983..acdbf3d 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -106,12 +106,8 @@
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
- throw new HyracksDataException(e);
} catch (Exception e) {
- // Do nothing here.
+ throw new HyracksDataException(e);
}
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 506a870..f5227e5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -429,7 +429,7 @@
case NOTIFY_TASK_FAILURE: {
CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
- .getTaskId(), ntff.getDetails(), ntff.getDetails()));
+ .getTaskId(), ntff.getDetails(), ntff.getDetails(), ntff.getCaughtExceptions()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e648733..03bd96d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -51,7 +52,8 @@
}
@Override
- public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+ throws HyracksException {
DatasetJobRecord djr = jobResultLocations.get(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
@@ -117,9 +119,11 @@
}
@Override
- public synchronized void reportJobFailure(JobId jobId) {
+ public synchronized void reportJobFailure(JobId jobId, List<Throwable> caughtExceptions) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
- djr.fail();
+ if (djr != null) {
+ djr.fail(caughtExceptions);
+ }
notifyAll();
}
@@ -192,7 +196,12 @@
}
if (djr.getStatus() == Status.FAILED) {
- throw new HyracksDataException("Job failed.");
+ List<Throwable> caughtExceptions = djr.getCaughtExceptions();
+ if (caughtExceptions == null) {
+ throw new HyracksDataException("Job failed.");
+ } else {
+ throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+ }
}
ResultSetMetaData resultSetMetaData = djr.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 33e1ff6..d4e06a8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -173,7 +174,11 @@
wait();
}
if (exception != null) {
- throw new HyracksException("Job Failed", exception);
+ if (exception instanceof HyracksDataException) {
+ throw exception;
+ } else {
+ throw new HyracksException(exception);
+ }
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 34b7dc7..da1683d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -596,7 +596,7 @@
* @param details
* - Cause of the failure
*/
- public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details) {
+ public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, String details, List<Throwable> caughtExceptions) {
try {
LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
@@ -610,7 +610,7 @@
lastAttempt.setEndTime(System.currentTimeMillis());
abortDoomedTaskClusters();
if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
- abortJob(new HyracksException(details));
+ abortJob(new HyracksException(caughtExceptions.get(caughtExceptions.size() - 1)));
return;
}
startRunnableActivityClusters();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index bc8c314..bf5f314 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -23,18 +25,21 @@
public class TaskFailureWork extends AbstractTaskLifecycleWork {
private final String details;
+ private final List<Throwable> caughtExceptions;
- public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, String details) {
+ public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
+ String details, List<Throwable> caughtExceptions) {
super(ccs, jobId, taId, nodeId);
this.details = details;
+ this.caughtExceptions = caughtExceptions;
}
@Override
protected void performEvent(TaskAttempt ta) {
JobRun run = ccs.getActiveRunMap().get(jobId);
- ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+ ccs.getDatasetDirectoryService().reportJobFailure(jobId, caughtExceptions);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
- run.getScheduler().notifyTaskFailure(ta, ac, details);
+ run.getScheduler().notifyTaskFailure(ta, ac, details, caughtExceptions);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 627dd55..1a4dc73 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -35,7 +35,8 @@
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details,
+ List<Throwable> caughtExceptions) throws Exception;
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index f6ab9ba..3755bd2 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -201,12 +201,15 @@
private final TaskAttemptId taskId;
private final String nodeId;
private final String details;
+ private final List<Throwable> caughtExceptions;
- public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details,
+ List<Throwable> caughtExceptions) {
this.jobId = jobId;
this.taskId = taskId;
this.nodeId = nodeId;
this.details = details;
+ this.caughtExceptions = caughtExceptions;
}
@Override
@@ -229,6 +232,10 @@
public String getDetails() {
return details;
}
+
+ public List<Throwable> getCaughtExceptions() {
+ return caughtExceptions;
+ }
}
public static class NotifyJobletCleanupFunction extends Function {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 057a0f4..8485152 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -57,9 +57,10 @@
}
@Override
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details,
+ List<Throwable> caughtExceptions) throws Exception {
CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
- details);
+ details, caughtExceptions);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d6ea111..eefaf54 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -18,9 +18,11 @@
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -86,6 +88,8 @@
private PrintWriter errorWriter;
+ private List<Throwable> caughtExceptions;
+
private volatile boolean aborted;
private NodeControllerService ncs;
@@ -104,6 +108,7 @@
failed = false;
errorBaos = new ByteArrayOutputStream();
errorWriter = new PrintWriter(errorBaos, true);
+ caughtExceptions = new ArrayList<Throwable>();
this.ncs = ncs;
}
@@ -252,6 +257,7 @@
} catch (HyracksDataException e) {
synchronized (Task.this) {
failed = true;
+ caughtExceptions.add(e);
errorWriter.println("Exception caught by thread: " + thread.getName());
e.printStackTrace(errorWriter);
errorWriter.println();
@@ -277,6 +283,7 @@
ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
} catch (Exception e) {
failed = true;
+ caughtExceptions.add(e);
errorWriter.println("Exception caught by thread: " + ct.getName());
e.printStackTrace(errorWriter);
errorWriter.println();
@@ -289,7 +296,8 @@
errorWriter.close();
NodeControllerService ncs = joblet.getNodeController();
try {
- ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+ ncs.getWorkQueue().schedule(
+ new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8"), caughtExceptions));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..2235224 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import java.util.List;
+
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
@@ -22,18 +24,20 @@
private final NodeControllerService ncs;
private final Task task;
private final String details;
+ private final List<Throwable> caughtExceptions;
- public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+ public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details, List<Throwable> caughtExceptions) {
this.ncs = ncs;
this.task = task;
this.details = details;
+ this.caughtExceptions = caughtExceptions;
}
@Override
public void run() {
try {
ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
- ncs.getId(), details);
+ ncs.getId(), details, caughtExceptions);
} catch (Exception e) {
e.printStackTrace();
}