Remove all result distribution states and delete the underlying files when shutting down.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2ec7acc..6d2c043 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -61,22 +62,10 @@
partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
private static final long serialVersionUID = 1L;
- protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+ protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
synchronized (DatasetPartitionManager.this) {
if (size() > resultHistorySize) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
- for (ResultSetId rsId : rsIdMap.keySet()) {
- ResultState[] resultStates = rsIdMap.get(rsId);
- if (resultStates != null) {
- for (int i = 0; i < resultStates.length; i++) {
- ResultState state = resultStates[i];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
- }
- }
- }
- }
+ deinitState(eldest);
return true;
}
return false;
@@ -193,7 +182,28 @@
}
@Override
- public void close() {
+ public synchronized void close() {
+ for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+ deinitState(entry);
+ }
deallocatableRegistry.close();
}
+
+ public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
+ Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+ if (rsIdMap != null) {
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ }
}