Merge branch 'master' into raman/fullstack_lsm_staging_coredump
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index dc99ef3..563ee1b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import java.util.HashMap;
+import java.util.List;
 
 public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
     public enum Status {
@@ -27,6 +28,8 @@
 
     private Status status;
 
+    private List<Exception> exceptions;
+
     public DatasetJobRecord() {
         this.status = Status.RUNNING;
     }
@@ -43,7 +46,16 @@
         status = Status.FAILED;
     }
 
+    public void fail(List<Exception> exceptions) {
+        status = Status.FAILED;
+        this.exceptions = exceptions;
+    }
+
     public Status getStatus() {
         return status;
     }
+
+    public List<Exception> getExceptions() {
+        return exceptions;
+    }
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 52e6005..d152cf5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -28,7 +30,7 @@
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 
-    public void reportJobFailure(JobId jobId);
+    public void reportJobFailure(JobId jobId, List<Exception> exceptions);
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 33e1b01..97d3744 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -106,12 +106,8 @@
                     lastMonitor = getMonitor(lastReadPartition);
                     resultChannel.open(datasetClientCtx);
                     resultChannel.registerMonitor(lastMonitor);
-                } catch (HyracksException e) {
-                    throw new HyracksDataException(e);
-                } catch (UnknownHostException e) {
-                    throw new HyracksDataException(e);
                 } catch (Exception e) {
-                    // Do nothing here.
+                    throw new HyracksDataException(e);
                 }
             }
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 21b05d4..36082b0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -16,6 +16,7 @@
 
 import java.util.Arrays;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -51,7 +52,8 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
+            throws HyracksException {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
@@ -119,10 +121,10 @@
     }
 
     @Override
-    public synchronized void reportJobFailure(JobId jobId) {
+    public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
         if (djr != null) {
-            djr.fail();
+            djr.fail(exceptions);
         }
         notifyAll();
     }
@@ -196,7 +198,12 @@
         }
 
         if (djr.getStatus() == Status.FAILED) {
-            throw new HyracksDataException("Job failed.");
+            List<Exception> caughtExceptions = djr.getExceptions();
+            if (caughtExceptions == null) {
+                throw new HyracksDataException("Job failed.");
+            } else {
+                throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+            }
         }
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 51eb671..6a04487 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -35,7 +35,7 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         JobRun run = ccs.getActiveRunMap().get(jobId);
-        ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+        ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 240322c..fec35ac 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -59,7 +59,8 @@
     }
 
     @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception {
+    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+            throws Exception {
         CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
                 exceptions);
         ipcHandle.send(-1, fn, null);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 206bf32..33519dc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -82,6 +82,8 @@
 
     private final List<Exception> exceptions;
 
+    private List<Throwable> caughtExceptions;
+
     private volatile boolean aborted;
 
     private NodeControllerService ncs;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2ec7acc..6d2c043 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
 
@@ -61,22 +62,10 @@
         partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
             private static final long serialVersionUID = 1L;
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+            protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
                 synchronized (DatasetPartitionManager.this) {
                     if (size() > resultHistorySize) {
-                        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
-                        for (ResultSetId rsId : rsIdMap.keySet()) {
-                            ResultState[] resultStates = rsIdMap.get(rsId);
-                            if (resultStates != null) {
-                                for (int i = 0; i < resultStates.length; i++) {
-                                    ResultState state = resultStates[i];
-                                    if (state != null) {
-                                        state.closeAndDelete();
-                                        LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
-                                    }
-                                }
-                            }
-                        }
+                        deinitState(eldest);
                         return true;
                     }
                     return false;
@@ -193,7 +182,28 @@
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+            deinitState(entry);
+        }
         deallocatableRegistry.close();
     }
+
+    public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
+        Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+        if (rsIdMap != null) {
+            for (ResultSetId rsId : rsIdMap.keySet()) {
+                ResultState[] resultStates = rsIdMap.get(rsId);
+                if (resultStates != null) {
+                    for (int i = 0; i < resultStates.length; i++) {
+                        ResultState state = resultStates[i];
+                        if (state != null) {
+                            state.closeAndDelete();
+                            LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index c70a1bd..d96872d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -25,6 +25,7 @@
 public class NotifyTaskFailureWork extends AbstractWork {
     private final NodeControllerService ncs;
     private final Task task;
+
     private final List<Exception> exceptions;
 
     public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) {