Implement an NC side sweeper thread to clean result distribution states and files.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f82aa37..46f65c0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -217,7 +217,7 @@
     private void init() throws Exception {
         ctx.getIOManager().setExecutor(executor);
         datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultHistorySize);
+                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
         datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
                 datasetPartitionManager, ncConfig.nNetThreads);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 6c10cd4..a0e78f8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,11 +14,14 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Executor;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -40,7 +43,7 @@
 
     private final Executor executor;
 
-    private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
+    private final Map<JobId, ResultSetMap> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -48,8 +51,8 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
-    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
-            final int resultHistorySize) {
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
+            long resultSweepThreshold) {
         this.ncs = ncs;
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -59,19 +62,8 @@
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
-            private static final long serialVersionUID = 1L;
-
-            protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
-                synchronized (DatasetPartitionManager.this) {
-                    if (size() > resultHistorySize) {
-                        deinitState(eldest);
-                        return true;
-                    }
-                    return false;
-                }
-            }
-        };
+        partitionResultStateMap = new LinkedHashMap<JobId, ResultSetMap>();
+        executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
     }
 
     @Override
@@ -86,9 +78,9 @@
                 dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
                         fileFactory);
 
-                Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+                ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
                 if (rsIdMap == null) {
-                    rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+                    rsIdMap = new ResultSetMap();
                     partitionResultStateMap.put(jobId, rsIdMap);
                 }
 
@@ -134,7 +126,7 @@
             IFrameWriter writer) throws HyracksException {
         ResultState resultState;
         synchronized (this) {
-            Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+            ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
 
             if (rsIdMap == null) {
                 throw new HyracksException("Unknown JobId " + jobId);
@@ -188,7 +180,7 @@
 
     @Override
     public synchronized void abortReader(JobId jobId) {
-        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
 
         if (rsIdMap == null) {
             return;
@@ -213,14 +205,14 @@
 
     @Override
     public synchronized void close() {
-        for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+        for (Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
             deinitState(entry);
         }
         deallocatableRegistry.close();
     }
 
-    public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
-        Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+    private void deinitState(Entry<JobId, ResultSetMap> entry) {
+        ResultSetMap rsIdMap = entry.getValue();
         if (rsIdMap != null) {
             for (ResultSetId rsId : rsIdMap.keySet()) {
                 ResultState[] resultStates = rsIdMap.get(rsId);
@@ -236,4 +228,65 @@
             }
         }
     }
+
+    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> {
+        private static final long serialVersionUID = 1L;
+
+        long timestamp;
+
+        public ResultSetMap() {
+            super();
+            timestamp = System.currentTimeMillis();
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    class Sweeper implements Runnable {
+        private final long resultTTL;
+
+        private final long resultSweepThreshold;
+
+        private final List<JobId> toBeCollected;
+
+        public Sweeper(long resultTTL, long resultSweepThreshold) {
+            this.resultTTL = resultTTL;
+            this.resultSweepThreshold = resultSweepThreshold;
+            toBeCollected = new ArrayList<JobId>();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(resultSweepThreshold);
+                    sweep();
+                } catch (InterruptedException e) {
+                    LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+                    // There isn't much we can do really here
+                }
+            }
+
+        }
+
+        private void sweep() {
+            toBeCollected.clear();
+            synchronized (DatasetPartitionManager.this) {
+                for (Map.Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
+                    if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+                        toBeCollected.add(entry.getKey());
+                        deinitState(entry);
+                    }
+                }
+                for (JobId jobId : toBeCollected) {
+                    partitionResultStateMap.remove(jobId);
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Result state cleanup instance successfully completed.");
+            }
+        }
+    }
 }