Implement a CC side sweeper thread to clean up result distribution Directory Service states related to results.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index 563ee1b..f010aa9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -26,11 +26,14 @@
private static final long serialVersionUID = 1L;
+ private final long timestamp;
+
private Status status;
private List<Exception> exceptions;
public DatasetJobRecord() {
+ this.timestamp = System.currentTimeMillis();
this.status = Status.RUNNING;
}
@@ -51,6 +54,10 @@
this.exceptions = exceptions;
}
+ public long getTimestamp() {
+ return timestamp;
+ }
+
public Status getStatus() {
return status;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index d152cf5..ab2df55 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.dataset;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
@@ -23,6 +24,8 @@
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetDirectoryService extends IJobLifecycleListener {
+ public void init(ExecutorService executor);
+
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
int nPartitions, NetworkAddress networkAddress);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 1e78f09..24cb543 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -188,7 +188,7 @@
}
};
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
+ datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
jobCounter = 0;
deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
@@ -220,6 +220,8 @@
timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
jobLog.open();
startApplication();
+
+ datasetDirectoryService.init(executor);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 36082b0..d7b384b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,10 +14,14 @@
*/
package edu.uci.ics.hyracks.control.cc.dataset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -39,16 +43,23 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
+ private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
+ private final long resultTTL;
+
+ private final long resultSweepThreshold;
+
private final Map<JobId, DatasetJobRecord> jobResultLocations;
- public DatasetDirectoryService(final int jobHistorySize) {
- jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
- private static final long serialVersionUID = 1L;
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>();
+ }
- protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
- return size() > jobHistorySize;
- }
- };
+ @Override
+ public void init(ExecutorService executor) {
+ executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
}
@Override
@@ -243,4 +254,49 @@
}
return null;
}
+
+ class Sweeper implements Runnable {
+ private final long resultTTL;
+
+ private final long resultSweepThreshold;
+
+ private final List<JobId> toBeCollected;
+
+ public Sweeper(long resultTTL, long resultSweepThreshold) {
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ toBeCollected = new ArrayList<JobId>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(resultSweepThreshold);
+ sweep();
+ } catch (InterruptedException e) {
+ LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+ // There isn't much we can do really here
+ }
+ }
+
+ }
+
+ private void sweep() {
+ synchronized (DatasetDirectoryService.this) {
+ toBeCollected.clear();
+ for (Map.Entry<JobId, DatasetJobRecord> entry : jobResultLocations.entrySet()) {
+ if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+ toBeCollected.add(entry.getKey());
+ }
+ }
+ for (JobId jobId : toBeCollected) {
+ jobResultLocations.remove(jobId);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Result state cleanup instance successfully completed.");
+ }
+ }
+ }
}
\ No newline at end of file