Implement read aborting in all the nodes when a job fails.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..0c5a5bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -31,6 +31,8 @@
public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+ public void abortReader(JobId jobId);
+
public IWorkspaceFileFactory getFileFactory();
public void close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b4703b9..a632120 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -146,6 +146,20 @@
}
@Override
+ public synchronized void abortReader(JobId jobId) {
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+ if (resultStates == null) {
+ return;
+ }
+ for (ResultState state : resultStates) {
+ if (state != null) {
+ state.abort();
+ }
+ }
+ }
+
+ @Override
public IWorkspaceFileFactory getFileFactory() {
return fileFactory;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2abbed5..51cfc0d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -78,6 +78,8 @@
@Override
public void fail() throws HyracksDataException {
try {
+ resultState.closeAndDelete();
+ resultState.abort();
manager.reportPartitionFailure(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index ce37f33..73044a1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -45,6 +45,8 @@
private final AtomicBoolean eos;
+ private final AtomicBoolean failed;
+
private final List<Page> localPageList;
private final List<Integer> inMemoryList;
@@ -70,6 +72,7 @@
this.fileFactory = fileFactory;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
inMemoryList = new ArrayList<Integer>();
@@ -146,7 +149,7 @@
throws HyracksDataException {
long readSize = 0;
synchronized (this) {
- while (offset >= size && !eos.get()) {
+ while (offset >= size && !eos.get() && !failed.get()) {
try {
wait();
} catch (InterruptedException e) {
@@ -155,7 +158,7 @@
}
}
- if (offset >= size && eos.get()) {
+ if ((offset >= size && eos.get()) || failed.get()) {
return readSize;
}
@@ -182,6 +185,11 @@
return readSize;
}
+ public synchronized void abort() {
+ failed.set(true);
+ notifyAll();
+ }
+
public synchronized Page returnPage() throws HyracksDataException {
Page page = removePage();
@@ -265,13 +273,17 @@
}
private void initReadFileHandle() throws HyracksDataException {
- while (fileRef == null) {
+ while (fileRef == null && !failed.get()) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
+ if (failed.get()) {
+ return;
+ }
+
readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..a078f50 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -46,6 +46,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
+ ncs.getDatasetPartitionManager().abortReader(jobId);
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..8b9d15a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +33,9 @@
@Override
public void run() {
try {
- ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
- ncs.getId(), details);
+ JobId jobId = task.getJobletContext().getJobId();
+ ncs.getDatasetPartitionManager().abortReader(jobId);
+ ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
} catch (Exception e) {
e.printStackTrace();
}