Implement a CC side sweeper thread to clean up result distribution Directory Service states related to results.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index 563ee1b..f010aa9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -26,11 +26,14 @@
 
     private static final long serialVersionUID = 1L;
 
+    private final long timestamp;
+
     private Status status;
 
     private List<Exception> exceptions;
 
     public DatasetJobRecord() {
+        this.timestamp = System.currentTimeMillis();
         this.status = Status.RUNNING;
     }
 
@@ -51,6 +54,10 @@
         this.exceptions = exceptions;
     }
 
+    public long getTimestamp() {
+        return timestamp;
+    }
+
     public Status getStatus() {
         return status;
     }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index d152cf5..ab2df55 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
@@ -23,6 +24,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetDirectoryService extends IJobLifecycleListener {
+    public void init(ExecutorService executor);
+
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
             int nPartitions, NetworkAddress networkAddress);
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 1e78f09..24cb543 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -188,7 +188,7 @@
             }
         };
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
         jobCounter = 0;
 
         deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
@@ -220,6 +220,8 @@
         timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
         jobLog.open();
         startApplication();
+
+        datasetDirectoryService.init(executor);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 36082b0..d7b384b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,10 +14,14 @@
  */
 package edu.uci.ics.hyracks.control.cc.dataset;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -39,16 +43,23 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
+    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
+    private final long resultTTL;
+
+    private final long resultSweepThreshold;
+
     private final Map<JobId, DatasetJobRecord> jobResultLocations;
 
-    public DatasetDirectoryService(final int jobHistorySize) {
-        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
-            private static final long serialVersionUID = 1L;
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+        this.resultTTL = resultTTL;
+        this.resultSweepThreshold = resultSweepThreshold;
+        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>();
+    }
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
-                return size() > jobHistorySize;
-            }
-        };
+    @Override
+    public void init(ExecutorService executor) {
+        executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
     }
 
     @Override
@@ -243,4 +254,49 @@
         }
         return null;
     }
+
+    class Sweeper implements Runnable {
+        private final long resultTTL;
+
+        private final long resultSweepThreshold;
+
+        private final List<JobId> toBeCollected;
+
+        public Sweeper(long resultTTL, long resultSweepThreshold) {
+            this.resultTTL = resultTTL;
+            this.resultSweepThreshold = resultSweepThreshold;
+            toBeCollected = new ArrayList<JobId>();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(resultSweepThreshold);
+                    sweep();
+                } catch (InterruptedException e) {
+                    LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+                    // There isn't much we can do really here
+                }
+            }
+
+        }
+
+        private void sweep() {
+            synchronized (DatasetDirectoryService.this) {
+                toBeCollected.clear();
+                for (Map.Entry<JobId, DatasetJobRecord> entry : jobResultLocations.entrySet()) {
+                    if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+                        toBeCollected.add(entry.getKey());
+                    }
+                }
+                for (JobId jobId : toBeCollected) {
+                    jobResultLocations.remove(jobId);
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Result state cleanup instance successfully completed.");
+            }
+        }
+    }
 }
\ No newline at end of file