Use LinkedHashMap to remove the state of old entries.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3261 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 05365d3..5ecfe1d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -170,7 +170,7 @@
}
};
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService();
+ datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
jobCounter = 0;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 13ae426..bd218eb 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -15,7 +15,7 @@
package edu.uci.ics.hyracks.control.cc.dataset;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -38,8 +38,15 @@
public class DatasetDirectoryService implements IDatasetDirectoryService {
private final Map<JobId, DatasetJobRecord> jobResultLocations;
- public DatasetDirectoryService() {
- jobResultLocations = new HashMap<JobId, DatasetJobRecord>();
+ public DatasetDirectoryService(final int jobHistorySize) {
+ jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
+ private static final long serialVersionUID = 1L;
+
+ protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
+ return size() > jobHistorySize;
+ }
+ };
+;
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index d1577bc..ec29592 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -51,6 +51,9 @@
@Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
public int maxMemory = -1;
+ @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
+ public int resultHistorySize = 100;
+
@Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
public int resultManagerMemory = -1;
@@ -79,10 +82,12 @@
cList.add(String.valueOf(nNetThreads));
cList.add("-max-memory");
cList.add(String.valueOf(maxMemory));
+ cList.add("-result-history-size");
+ cList.add(String.valueOf(resultHistorySize));
cList.add("-result-manager-memory");
cList.add(String.valueOf(resultManagerMemory));
- if (appNCMainClass != null) {
+ if (appNCMainClass != null) {
cList.add("-app-nc-main-class");
cList.add(appNCMainClass);
}
@@ -92,5 +97,5 @@
cList.add(appArg);
}
}
- }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 10e0dba..e15c60e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -150,7 +150,8 @@
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory);
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+ ncConfig.resultHistorySize);
datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
datasetPartitionManager, ncConfig.nNetThreads);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 1cad54b..cd58c3e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.dataset;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -44,10 +44,17 @@
private final DatasetMemoryManager datasetMemoryManager;
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
+ final int resultHistorySize) {
this.ncs = ncs;
this.executor = executor;
- partitionResultStateMap = new HashMap<JobId, ResultState[]>();
+ partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+ private static final long serialVersionUID = 1L;
+
+ protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
+ return size() > resultHistorySize;
+ }
+ };
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
datasetMemoryManager = new DatasetMemoryManager(availableMemory);