Remove all result distribution states and delete the underlying files when shutting down.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 2ec7acc..6d2c043 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
 
@@ -61,22 +62,10 @@
         partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
             private static final long serialVersionUID = 1L;
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+            protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
                 synchronized (DatasetPartitionManager.this) {
                     if (size() > resultHistorySize) {
-                        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
-                        for (ResultSetId rsId : rsIdMap.keySet()) {
-                            ResultState[] resultStates = rsIdMap.get(rsId);
-                            if (resultStates != null) {
-                                for (int i = 0; i < resultStates.length; i++) {
-                                    ResultState state = resultStates[i];
-                                    if (state != null) {
-                                        state.closeAndDelete();
-                                        LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
-                                    }
-                                }
-                            }
-                        }
+                        deinitState(eldest);
                         return true;
                     }
                     return false;
@@ -193,7 +182,28 @@
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
+        for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+            deinitState(entry);
+        }
         deallocatableRegistry.close();
     }
+
+    public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
+        Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+        if (rsIdMap != null) {
+            for (ResultSetId rsId : rsIdMap.keySet()) {
+                ResultState[] resultStates = rsIdMap.get(rsId);
+                if (resultStates != null) {
+                    for (int i = 0; i < resultStates.length; i++) {
+                        ResultState state = resultStates[i];
+                        if (state != null) {
+                            state.closeAndDelete();
+                            LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+    }
 }