diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 55f1d7c..4e7ddda 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -84,7 +84,7 @@
     private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
 
     public DatasetJobRecord() {
-        this.timestamp = System.currentTimeMillis();
+        this.timestamp = System.nanoTime();
         this.status = new Status();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index c8463d3..a0c1f78 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -24,11 +24,15 @@
 
 public interface IDatasetManager {
 
-    public Set<JobId> getJobIds();
+    Set<JobId> getJobIds();
 
-    public IDatasetStateRecord getState(JobId jobId);
+    IDatasetStateRecord getState(JobId jobId);
 
-    public void deinitState(JobId jobId);
+    void sweep(JobId jobId);
 
-    public long getResultTimestamp(JobId jobId);
-}
+    /**
+     * Removes all references and deletes persisted files for
+     * all expired datasets.
+     */
+    void sweepExpiredDatasets();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index a57baf5..04aaddd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.logging.log4j.Level;
@@ -53,25 +54,23 @@
  * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
  * job.
  */
-public class DatasetDirectoryService implements IDatasetDirectoryService {
+public class DatasetDirectoryService extends AbstractDatasetManager implements IDatasetDirectoryService {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private final long resultTTL;
-
     private final long resultSweepThreshold;
 
     private final Map<JobId, JobResultInfo> jobResultLocations;
 
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
-        this.resultTTL = resultTTL;
+        super(resultTTL);
         this.resultSweepThreshold = resultSweepThreshold;
-        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+        jobResultLocations = new LinkedHashMap<>();
     }
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
     }
 
     @Override
@@ -181,12 +180,7 @@
     }
 
     @Override
-    public synchronized long getResultTimestamp(JobId jobId) {
-        return getState(jobId).getTimestamp();
-    }
-
-    @Override
-    public synchronized void deinitState(JobId jobId) {
+    public synchronized void sweep(JobId jobId) {
         jobResultLocations.remove(jobId);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
new file mode 100644
index 0000000..f95229e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/AbstractDatasetManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.dataset.IDatasetManager;
+import org.apache.hyracks.api.dataset.IDatasetStateRecord;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class AbstractDatasetManager implements IDatasetManager {
+
+    private final long nanoResultTTL;
+
+    protected AbstractDatasetManager(long resultTTL) {
+        this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL);
+    }
+
+    @Override
+    public synchronized void sweepExpiredDatasets() {
+        final List<JobId> expiredDatasets = new ArrayList<>();
+        final long sweepTime = System.nanoTime();
+        for (JobId jobId : getJobIds()) {
+            final IDatasetStateRecord state = getState(jobId);
+            if (state != null && hasExpired(state, sweepTime, nanoResultTTL)) {
+                expiredDatasets.add(jobId);
+            }
+        }
+        for (JobId jobId : expiredDatasets) {
+            sweep(jobId);
+        }
+    }
+
+    private static boolean hasExpired(IDatasetStateRecord dataset, long currentTime, long ttl) {
+        return currentTime - dataset.getTimestamp() - ttl > 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index a9ca771..901ec67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -19,12 +19,7 @@
 
 package org.apache.hyracks.control.common.dataset;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
 
 /**
@@ -33,53 +28,26 @@
 public class ResultStateSweeper implements Runnable {
 
     private final IDatasetManager datasetManager;
-
-    private final long resultTTL;
-
     private final long resultSweepThreshold;
-
     private final Logger logger;
 
-    private final List<JobId> toBeCollected;
-
-    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
-            Logger logger) {
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultSweepThreshold, Logger logger) {
         this.datasetManager = datasetManager;
-        this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         this.logger = logger;
-        toBeCollected = new ArrayList<JobId>();
     }
 
     @Override
-    @SuppressWarnings("squid:S2142") // catch interrupted exception
     public void run() {
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(resultSweepThreshold);
-                sweep();
+                datasetManager.sweepExpiredDatasets();
+                logger.trace("Result state cleanup instance successfully completed.");
             } catch (InterruptedException e) {
-                logger.log(Level.WARN, "Result cleaner thread interrupted, shutting down.");
-                break; // the interrupt was explicit from another thread. This thread should shut down...
+                logger.warn("Result cleaner thread interrupted, shutting down.");
+                Thread.currentThread().interrupt();
             }
         }
     }
-
-    private void sweep() {
-        synchronized (datasetManager) {
-            toBeCollected.clear();
-            for (JobId jobId : datasetManager.getJobIds()) {
-                final long timestamp = datasetManager.getResultTimestamp(jobId);
-                if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) {
-                    toBeCollected.add(jobId);
-                }
-            }
-            for (JobId jobId : toBeCollected) {
-                datasetManager.deinitState(jobId);
-            }
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Result state cleanup instance successfully completed.");
-        }
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index d381a67..476aeae 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -26,11 +26,11 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.dataset.AbstractDatasetManager;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -38,7 +38,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-public class DatasetPartitionManager implements IDatasetPartitionManager {
+public class DatasetPartitionManager extends AbstractDatasetManager implements IDatasetPartitionManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final NodeControllerService ncs;
@@ -55,6 +55,7 @@
 
     public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
             long resultSweepThreshold) {
+        super(resultTTL);
         this.ncs = ncs;
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -65,7 +66,7 @@
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
+        executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
     }
 
     @Override
@@ -179,16 +180,7 @@
     }
 
     @Override
-    public synchronized long getResultTimestamp(JobId jobId) {
-        IDatasetStateRecord r = getState(jobId);
-        if (r == null) {
-            return -1;
-        }
-        return r.getTimestamp();
-    }
-
-    @Override
-    public synchronized void deinitState(JobId jobId) {
+    public synchronized void sweep(JobId jobId) {
         deinit(jobId);
         partitionResultStateMap.remove(jobId);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
index 3957401..1a64a5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultSetMap.java
@@ -37,7 +37,7 @@
     private final HashMap<ResultSetId, ResultState[]> resultStateMap;
 
     ResultSetMap() {
-        timestamp = System.currentTimeMillis();
+        timestamp = System.nanoTime();
         resultStateMap = new HashMap<>();
     }
 
