Implement an NC side sweeper thread to clean result distribution states and files.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f82aa37..46f65c0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -217,7 +217,7 @@
private void init() throws Exception {
ctx.getIOManager().setExecutor(executor);
datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultHistorySize);
+ ncConfig.resultTTL, ncConfig.resultSweepThreshold);
datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
datasetPartitionManager, ncConfig.nNetThreads);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 6c10cd4..a0e78f8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,11 +14,14 @@
*/
package edu.uci.ics.hyracks.control.nc.dataset;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -40,7 +43,7 @@
private final Executor executor;
- private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
+ private final Map<JobId, ResultSetMap> partitionResultStateMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -48,8 +51,8 @@
private final DatasetMemoryManager datasetMemoryManager;
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
- final int resultHistorySize) {
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
+ long resultSweepThreshold) {
this.ncs = ncs;
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -59,19 +62,8 @@
} else {
datasetMemoryManager = null;
}
- partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
- private static final long serialVersionUID = 1L;
-
- protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
- synchronized (DatasetPartitionManager.this) {
- if (size() > resultHistorySize) {
- deinitState(eldest);
- return true;
- }
- return false;
- }
- }
- };
+ partitionResultStateMap = new LinkedHashMap<JobId, ResultSetMap>();
+ executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
}
@Override
@@ -86,9 +78,9 @@
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
fileFactory);
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
- rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+ rsIdMap = new ResultSetMap();
partitionResultStateMap.put(jobId, rsIdMap);
}
@@ -134,7 +126,7 @@
IFrameWriter writer) throws HyracksException {
ResultState resultState;
synchronized (this) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
throw new HyracksException("Unknown JobId " + jobId);
@@ -188,7 +180,7 @@
@Override
public synchronized void abortReader(JobId jobId) {
- Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
return;
@@ -213,14 +205,14 @@
@Override
public synchronized void close() {
- for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
+ for (Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
deinitState(entry);
}
deallocatableRegistry.close();
}
- public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
- Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+ private void deinitState(Entry<JobId, ResultSetMap> entry) {
+ ResultSetMap rsIdMap = entry.getValue();
if (rsIdMap != null) {
for (ResultSetId rsId : rsIdMap.keySet()) {
ResultState[] resultStates = rsIdMap.get(rsId);
@@ -236,4 +228,65 @@
}
}
}
+
+ private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> {
+ private static final long serialVersionUID = 1L;
+
+ long timestamp;
+
+ public ResultSetMap() {
+ super();
+ timestamp = System.currentTimeMillis();
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ class Sweeper implements Runnable {
+ private final long resultTTL;
+
+ private final long resultSweepThreshold;
+
+ private final List<JobId> toBeCollected;
+
+ public Sweeper(long resultTTL, long resultSweepThreshold) {
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ toBeCollected = new ArrayList<JobId>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(resultSweepThreshold);
+ sweep();
+ } catch (InterruptedException e) {
+ LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+ // There isn't much we can do really here
+ }
+ }
+
+ }
+
+ private void sweep() {
+ toBeCollected.clear();
+ synchronized (DatasetPartitionManager.this) {
+ for (Map.Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
+ if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+ toBeCollected.add(entry.getKey());
+ deinitState(entry);
+ }
+ }
+ for (JobId jobId : toBeCollected) {
+ partitionResultStateMap.remove(jobId);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Result state cleanup instance successfully completed.");
+ }
+ }
+ }
}