Merge branch 'master' into raman/fullstack_lsm_staging_coredump
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index dc99ef3..563ee1b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.dataset;
import java.util.HashMap;
+import java.util.List;
public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
public enum Status {
@@ -27,6 +28,8 @@
private Status status;
+ private List<Exception> exceptions;
+
public DatasetJobRecord() {
this.status = Status.RUNNING;
}
@@ -43,7 +46,16 @@
status = Status.FAILED;
}
+ public void fail(List<Exception> exceptions) {
+ status = Status.FAILED;
+ this.exceptions = exceptions;
+ }
+
public Status getStatus() {
return status;
}
+
+ public List<Exception> getExceptions() {
+ return exceptions;
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 52e6005..d152cf5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.api.dataset;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -28,7 +30,7 @@
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
- public void reportJobFailure(JobId jobId);
+ public void reportJobFailure(JobId jobId, List<Exception> exceptions);
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 33e1b01..97d3744 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -106,12 +106,8 @@
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
- throw new HyracksDataException(e);
} catch (Exception e) {
- // Do nothing here.
+ throw new HyracksDataException(e);
}
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 21b05d4..36082b0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -51,7 +52,8 @@
}
@Override
- public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+ throws HyracksException {
DatasetJobRecord djr = jobResultLocations.get(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
@@ -119,10 +121,10 @@
}
@Override
- public synchronized void reportJobFailure(JobId jobId) {
+ public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
DatasetJobRecord djr = jobResultLocations.get(jobId);
if (djr != null) {
- djr.fail();
+ djr.fail(exceptions);
}
notifyAll();
}
@@ -196,7 +198,12 @@
}
if (djr.getStatus() == Status.FAILED) {
- throw new HyracksDataException("Job failed.");
+ List<Exception> caughtExceptions = djr.getExceptions();
+ if (caughtExceptions == null) {
+ throw new HyracksDataException("Job failed.");
+ } else {
+ throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+ }
}
ResultSetMetaData resultSetMetaData = djr.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 51eb671..6a04487 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -35,7 +35,7 @@
@Override
protected void performEvent(TaskAttempt ta) {
JobRun run = ccs.getActiveRunMap().get(jobId);
- ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+ ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 240322c..fec35ac 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -59,7 +59,8 @@
}
@Override
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception {
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+ throws Exception {
CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
exceptions);
ipcHandle.send(-1, fn, null);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 206bf32..33519dc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -82,6 +82,8 @@
private final List<Exception> exceptions;
+ private List<Throwable> caughtExceptions;
+
private volatile boolean aborted;
private NodeControllerService ncs;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2ec7acc..6d2c043 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -61,22 +62,10 @@
partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
private static final long serialVersionUID = 1L;
- protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+ protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
synchronized (DatasetPartitionManager.this) {
if (size() > resultHistorySize) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
- for (ResultSetId rsId : rsIdMap.keySet()) {
- ResultState[] resultStates = rsIdMap.get(rsId);
- if (resultStates != null) {
- for (int i = 0; i < resultStates.length; i++) {
- ResultState state = resultStates[i];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
- }
- }
- }
- }
+ deinitState(eldest);
return true;
}
return false;
@@ -193,7 +182,28 @@
}
@Override
- public void close() {
+ public synchronized void close() {
+ for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+ deinitState(entry);
+ }
deallocatableRegistry.close();
}
+
+ public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
+ Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+ if (rsIdMap != null) {
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index c70a1bd..d96872d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -25,6 +25,7 @@
public class NotifyTaskFailureWork extends AbstractWork {
private final NodeControllerService ncs;
private final Task task;
+
private final List<Exception> exceptions;
public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) {