For synchronous queries the partition should be removed as soon as it is read.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index e370949..1730165 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -32,6 +32,8 @@
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
throws HyracksException;
+ public void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+
public void abortReader(JobId jobId);
public IWorkspaceFileFactory getFileFactory();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2399320..6c10cd4 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -151,13 +151,42 @@
}
}
- DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+ partition);
}
@Override
+ public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
+ ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+ if (rsIdMap != null) {
+ ResultState[] resultStates = rsIdMap.get(resultSetId);
+ if (resultStates != null) {
+ ResultState state = resultStates[partition];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
+ }
+ resultStates[partition] = null;
+ boolean stateEmpty = true;
+ for (int i = 0; i < resultStates.length; i++) {
+ if (resultStates[i] != null) {
+ stateEmpty = false;
+ break;
+ }
+ }
+ if (stateEmpty) {
+ rsIdMap.remove(resultSetId);
+ }
+ }
+ if (rsIdMap.isEmpty()) {
+ partitionResultStateMap.remove(jobId);
+ }
+ }
+ }
+
+ @Override
public synchronized void abortReader(JobId jobId) {
Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 07624de..a4a7130 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -26,13 +26,17 @@
public class DatasetPartitionReader {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
+ private final DatasetPartitionManager datasetPartitionManager;
+
private final DatasetMemoryManager datasetMemoryManager;
private final Executor executor;
private final ResultState resultState;
- public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+ public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
+ DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+ this.datasetPartitionManager = datasetPartitionManager;
this.datasetMemoryManager = datasetMemoryManager;
this.executor = executor;
this.resultState = resultState;
@@ -66,6 +70,9 @@
} finally {
channel.close();
resultState.readClose();
+ datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
+ resultState.getResultSetPartitionId().getResultSetId(), resultState
+ .getResultSetPartitionId().getPartition());
}
} catch (HyracksDataException e) {
throw new RuntimeException(e);