[ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs
- user model changes: no
- storage format changes: no
- interface changes: yes
- IDatasetPartitionManager (-) abortAllReaders
Details:
- Currently, there is a possibility of reusing the same
result reference for two different jobs. This change
fixes this issue by removing old reference of aborted
jobs.
- Abort job tasks before aborting result readers to stop
result generation.
Change-Id: I8170887e007d63b143ef08a3a8e149ab3866fcb1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2386
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index e6cf6d3..b1e203f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -39,8 +39,6 @@
void abortReader(JobId jobId);
- void abortAllReaders();
-
void close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index fb7308e..b7cf9a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.control.nc.dataset;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -65,7 +65,7 @@
} else {
datasetMemoryManager = null;
}
- partitionResultStateMap = new LinkedHashMap<>();
+ partitionResultStateMap = new HashMap<>();
executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
}
@@ -77,14 +77,11 @@
synchronized (this) {
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
datasetMemoryManager, fileFactory, maxReads);
-
ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
-
ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
resultStates[partition] = dpw.getResultState();
}
-
- LOGGER.debug("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
+ LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", jobId, partition);
return dpw;
}
@@ -103,8 +100,8 @@
@Override
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
try {
- LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
- + ":partition: " + partition);
+ LOGGER.debug("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId,
+ partition);
ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (Exception e) {
throw HyracksException.create(e);
@@ -117,11 +114,11 @@
ResultState resultState = getResultState(jobId, resultSetId, partition);
DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
- LOGGER.debug("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
- + partition);
+ LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId,
+ partition);
}
- protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition)
+ private synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition)
throws HyracksException {
ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
@@ -155,13 +152,6 @@
}
@Override
- public synchronized void abortAllReaders() {
- for (ResultSetMap rsIdMap : partitionResultStateMap.values()) {
- rsIdMap.abortAll();
- }
- }
-
- @Override
public synchronized void close() {
for (JobId jobId : getJobIds()) {
deinit(jobId);
@@ -175,7 +165,7 @@
}
@Override
- public ResultSetMap getState(JobId jobId) {
+ public synchronized ResultSetMap getState(JobId jobId) {
return partitionResultStateMap.get(jobId);
}
@@ -191,5 +181,4 @@
rsIdMap.closeAndDeleteAll();
}
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 68d677f4..2bcf414 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.control.nc.Joblet;
@@ -50,19 +51,18 @@
LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
}
Collection<Joblet> joblets = ncs.getJobletMap().values();
- for (Joblet ji : joblets) {
- // TODO(mblow): should we have one jobletmap per cc?
- if (!ji.getJobId().getCcId().equals(ccId)) {
- continue;
- }
- if (dpm != null) {
- dpm.abortReader(ji.getJobId());
- }
- Collection<Task> tasks = ji.getTaskMap().values();
+ // TODO(mblow): should we have one jobletmap per cc?
+ joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
+ Collection<Task> tasks = joblet.getTaskMap().values();
for (Task task : tasks) {
task.abort();
}
- ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE));
- }
+ final JobId jobId = joblet.getJobId();
+ if (dpm != null) {
+ dpm.abortReader(jobId);
+ dpm.sweep(jobId);
+ }
+ ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
+ });
}
}