Add support for stopping the ResultStateSweeper thread in to the classes that create it.
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..629ac5d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -242,6 +242,7 @@
     @Override
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+        datasetDirectoryService.stop();
         executor.shutdownNow();
         webServer.stop();
         sweeper.cancel();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..7b060c7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -19,6 +19,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -41,21 +42,28 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
+    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
     private final long resultTTL;
 
     private final long resultSweepThreshold;
 
     private final Map<JobId, IDatasetStateRecord> jobResultLocations;
 
+    private ResultStateSweeper resultStateSweeper;
+
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+        resultStateSweeper = null;
     }
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        LOGGER.info("New ResultStateSweepser in DatasetDirectoryService.");
+        resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+        executor.execute(resultStateSweeper);
     }
 
     @Override
@@ -260,4 +268,9 @@
         }
         return null;
     }
+
+    @Override
+    public void stop() {
+        resultStateSweeper.close();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..05ed177 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -42,4 +42,6 @@
 
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+
+    public void stop();
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..a5ad630 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -39,16 +39,19 @@
 
     private final List<JobId> toBeCollected;
 
+    private boolean running;
+
     public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
         this.datasetManager = datasetManager;
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         toBeCollected = new ArrayList<JobId>();
+        running = true;
     }
 
     @Override
     public void run() {
-        while (true) {
+        while (running) {
             try {
                 Thread.sleep(resultSweepThreshold);
                 sweep();
@@ -57,9 +60,14 @@
                 // There isn't much we can do really here
             }
         }
-
+        LOGGER.info("Result cleaner thread has stopped.");
     }
 
+    public void close() {
+        running = false;
+        LOGGER.info("Result cleaner thread has been flagged to stop.");
+   }
+
     private void sweep() {
         synchronized (datasetManager) {
             toBeCollected.clear();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b28ffaa 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,6 +50,8 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
+    private final ResultStateSweeper resultStateSweeper;
+
     public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
             long resultSweepThreshold) {
         this.ncs = ncs;
@@ -62,7 +64,8 @@
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+        executor.execute(resultStateSweeper);
     }
 
     @Override
@@ -208,6 +211,7 @@
             deinit(entry.getKey());
         }
         deallocatableRegistry.close();
+        resultStateSweeper.close();
     }
 
     @Override