Refactor out the result state sweeper code.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index f010aa9..26f0b7a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -17,7 +17,7 @@
 import java.util.HashMap;
 import java.util.List;
 
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
     public enum Status {
         RUNNING,
         SUCCESS,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index ab2df55..5380d60 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetDirectoryService extends IJobLifecycleListener {
+public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
     public void init(ExecutorService executor);
 
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
new file mode 100644
index 0000000..b6cd1af
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IDatasetManager {
+    public Map<JobId, IDatasetStateRecord> getStateMap();
+
+    public void deinitState(JobId jobId);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 1730165..0935211 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -20,7 +20,7 @@
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetPartitionManager {
+public interface IDatasetPartitionManager extends IDatasetManager {
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
             boolean asyncMode, int partition, int nPartitions) throws HyracksException;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
new file mode 100644
index 0000000..1a400e3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+public interface IDatasetStateRecord {
+    public long getTimestamp();
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index d7b384b..a03d79f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,26 +14,25 @@
  */
 package edu.uci.ics.hyracks.control.cc.dataset;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
 
 /**
  * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -43,29 +42,27 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
-
     private final long resultTTL;
 
     private final long resultSweepThreshold;
 
-    private final Map<JobId, DatasetJobRecord> jobResultLocations;
+    private final Map<JobId, IDatasetStateRecord> jobResultLocations;
 
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
-        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>();
+        jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
     }
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
     }
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
             jobResultLocations.put(jobId, djr);
@@ -85,7 +82,7 @@
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
@@ -106,7 +103,7 @@
     public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
         int successCount = 0;
 
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
         records[partition].writeEOS();
@@ -124,7 +121,7 @@
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr != null) {
             djr.fail();
         }
@@ -133,7 +130,7 @@
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr != null) {
             djr.fail(exceptions);
         }
@@ -143,7 +140,7 @@
     @Override
     public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
         DatasetJobRecord djr;
-        while ((djr = jobResultLocations.get(jobId)) == null) {
+        while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -155,6 +152,16 @@
     }
 
     @Override
+    public Map<JobId, IDatasetStateRecord> getStateMap() {
+        return jobResultLocations;
+    }
+
+    @Override
+    public void deinitState(JobId jobId) {
+        jobResultLocations.remove(jobResultLocations.get(jobId));
+    }
+
+    @Override
     public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
         DatasetDirectoryRecord[] newRecords;
@@ -202,7 +209,7 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
 
         if (djr == null) {
             throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
@@ -254,49 +261,4 @@
         }
         return null;
     }
-
-    class Sweeper implements Runnable {
-        private final long resultTTL;
-
-        private final long resultSweepThreshold;
-
-        private final List<JobId> toBeCollected;
-
-        public Sweeper(long resultTTL, long resultSweepThreshold) {
-            this.resultTTL = resultTTL;
-            this.resultSweepThreshold = resultSweepThreshold;
-            toBeCollected = new ArrayList<JobId>();
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    Thread.sleep(resultSweepThreshold);
-                    sweep();
-                } catch (InterruptedException e) {
-                    LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
-                    // There isn't much we can do really here
-                }
-            }
-
-        }
-
-        private void sweep() {
-            synchronized (DatasetDirectoryService.this) {
-                toBeCollected.clear();
-                for (Map.Entry<JobId, DatasetJobRecord> entry : jobResultLocations.entrySet()) {
-                    if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
-                        toBeCollected.add(entry.getKey());
-                    }
-                }
-                for (JobId jobId : toBeCollected) {
-                    jobResultLocations.remove(jobId);
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Result state cleanup instance successfully completed.");
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
new file mode 100644
index 0000000..69b560c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * Sweeper to clean up the stale result distribution files and result states.
+ */
+public class ResultStateSweeper implements Runnable {
+    private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
+
+    private final IDatasetManager datasetManager;
+
+    private final long resultTTL;
+
+    private final long resultSweepThreshold;
+
+    private final List<JobId> toBeCollected;
+
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+        this.datasetManager = datasetManager;
+        this.resultTTL = resultTTL;
+        this.resultSweepThreshold = resultSweepThreshold;
+        toBeCollected = new ArrayList<JobId>();
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Thread.sleep(resultSweepThreshold);
+                sweep();
+            } catch (InterruptedException e) {
+                LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+                // There isn't much we can do really here
+            }
+        }
+
+    }
+
+    private void sweep() {
+        synchronized (datasetManager) {
+            toBeCollected.clear();
+            for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
+                if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+                    toBeCollected.add(entry.getKey());
+                }
+            }
+            for (JobId jobId : toBeCollected) {
+                datasetManager.deinitState(jobId);
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Result state cleanup instance successfully completed.");
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a0e78f8..1299593 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,23 +14,22 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Executor;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -43,7 +42,7 @@
 
     private final Executor executor;
 
-    private final Map<JobId, ResultSetMap> partitionResultStateMap;
+    private final Map<JobId, IDatasetStateRecord> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -62,8 +61,8 @@
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<JobId, ResultSetMap>();
-        executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
+        partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
     }
 
     @Override
@@ -78,7 +77,7 @@
                 dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
                         fileFactory);
 
-                ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+                ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
                 if (rsIdMap == null) {
                     rsIdMap = new ResultSetMap();
                     partitionResultStateMap.put(jobId, rsIdMap);
@@ -126,7 +125,7 @@
             IFrameWriter writer) throws HyracksException {
         ResultState resultState;
         synchronized (this) {
-            ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
 
             if (rsIdMap == null) {
                 throw new HyracksException("Unknown JobId " + jobId);
@@ -151,7 +150,7 @@
 
     @Override
     public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
             ResultState[] resultStates = rsIdMap.get(resultSetId);
             if (resultStates != null) {
@@ -180,7 +179,7 @@
 
     @Override
     public synchronized void abortReader(JobId jobId) {
-        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
 
         if (rsIdMap == null) {
             return;
@@ -205,14 +204,20 @@
 
     @Override
     public synchronized void close() {
-        for (Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
-            deinitState(entry);
+        for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
+            deinitState(entry.getKey());
         }
         deallocatableRegistry.close();
     }
 
-    private void deinitState(Entry<JobId, ResultSetMap> entry) {
-        ResultSetMap rsIdMap = entry.getValue();
+    @Override
+    public Map<JobId, IDatasetStateRecord> getStateMap() {
+        return partitionResultStateMap;
+    }
+
+    @Override
+    public void deinitState(JobId jobId) {
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.remove(jobId);
         if (rsIdMap != null) {
             for (ResultSetId rsId : rsIdMap.keySet()) {
                 ResultState[] resultStates = rsIdMap.get(rsId);
@@ -221,7 +226,7 @@
                         ResultState state = resultStates[i];
                         if (state != null) {
                             state.closeAndDelete();
-                            LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+                            LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
                         }
                     }
                 }
@@ -229,7 +234,7 @@
         }
     }
 
-    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> {
+    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
         private static final long serialVersionUID = 1L;
 
         long timestamp;
@@ -243,50 +248,4 @@
             return timestamp;
         }
     }
-
-    class Sweeper implements Runnable {
-        private final long resultTTL;
-
-        private final long resultSweepThreshold;
-
-        private final List<JobId> toBeCollected;
-
-        public Sweeper(long resultTTL, long resultSweepThreshold) {
-            this.resultTTL = resultTTL;
-            this.resultSweepThreshold = resultSweepThreshold;
-            toBeCollected = new ArrayList<JobId>();
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    Thread.sleep(resultSweepThreshold);
-                    sweep();
-                } catch (InterruptedException e) {
-                    LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
-                    // There isn't much we can do really here
-                }
-            }
-
-        }
-
-        private void sweep() {
-            toBeCollected.clear();
-            synchronized (DatasetPartitionManager.this) {
-                for (Map.Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
-                    if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
-                        toBeCollected.add(entry.getKey());
-                        deinitState(entry);
-                    }
-                }
-                for (JobId jobId : toBeCollected) {
-                    partitionResultStateMap.remove(jobId);
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Result state cleanup instance successfully completed.");
-            }
-        }
-    }
 }