Implement read aborting in all the nodes when a job fails.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..0c5a5bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -31,6 +31,8 @@
 
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
 
+    public void abortReader(JobId jobId);
+
     public IWorkspaceFileFactory getFileFactory();
 
     public void close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b4703b9..a632120 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -146,6 +146,20 @@
     }
 
     @Override
+    public synchronized void abortReader(JobId jobId) {
+        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+        if (resultStates == null) {
+            return;
+        }
+        for (ResultState state : resultStates) {
+            if (state != null) {
+                state.abort();
+            }
+        }
+    }
+
+    @Override
     public IWorkspaceFileFactory getFileFactory() {
         return fileFactory;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2abbed5..51cfc0d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -78,6 +78,8 @@
     @Override
     public void fail() throws HyracksDataException {
         try {
+            resultState.closeAndDelete();
+            resultState.abort();
             manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index ce37f33..73044a1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -45,6 +45,8 @@
 
     private final AtomicBoolean eos;
 
+    private final AtomicBoolean failed;
+
     private final List<Page> localPageList;
 
     private final List<Integer> inMemoryList;
@@ -70,6 +72,7 @@
         this.fileFactory = fileFactory;
         this.frameSize = frameSize;
         eos = new AtomicBoolean(false);
+        failed = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
         inMemoryList = new ArrayList<Integer>();
 
@@ -146,7 +149,7 @@
             throws HyracksDataException {
         long readSize = 0;
         synchronized (this) {
-            while (offset >= size && !eos.get()) {
+            while (offset >= size && !eos.get() && !failed.get()) {
                 try {
                     wait();
                 } catch (InterruptedException e) {
@@ -155,7 +158,7 @@
             }
         }
 
-        if (offset >= size && eos.get()) {
+        if ((offset >= size && eos.get()) || failed.get()) {
             return readSize;
         }
 
@@ -182,6 +185,11 @@
         return readSize;
     }
 
+    public synchronized void abort() {
+        failed.set(true);
+        notifyAll();
+    }
+
     public synchronized Page returnPage() throws HyracksDataException {
         Page page = removePage();
 
@@ -265,13 +273,17 @@
     }
 
     private void initReadFileHandle() throws HyracksDataException {
-        while (fileRef == null) {
+        while (fileRef == null && !failed.get()) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw new HyracksDataException(e);
             }
         }
+        if (failed.get()) {
+            return;
+        }
+
         readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..a078f50 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -46,6 +46,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
         }
+        ncs.getDatasetPartitionManager().abortReader(jobId);
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..8b9d15a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +33,9 @@
     @Override
     public void run() {
         try {
-            ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
-                    ncs.getId(), details);
+            JobId jobId = task.getJobletContext().getJobId();
+            ncs.getDatasetPartitionManager().abortReader(jobId);
+            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
         } catch (Exception e) {
             e.printStackTrace();
         }