Add support for stopping the ResultStateSweeper thread in to the classes that create it.
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..629ac5d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -242,6 +242,7 @@
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+ datasetDirectoryService.stop();
executor.shutdownNow();
webServer.stop();
sweeper.cancel();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..7b060c7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -41,21 +42,28 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
+ private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
private final long resultTTL;
private final long resultSweepThreshold;
private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+ private ResultStateSweeper resultStateSweeper;
+
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ resultStateSweeper = null;
}
@Override
public void init(ExecutorService executor) {
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ LOGGER.info("New ResultStateSweepser in DatasetDirectoryService.");
+ resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+ executor.execute(resultStateSweeper);
}
@Override
@@ -260,4 +268,9 @@
}
return null;
}
+
+ @Override
+ public void stop() {
+ resultStateSweeper.close();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..05ed177 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -42,4 +42,6 @@
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+
+ public void stop();
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..a5ad630 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -39,16 +39,19 @@
private final List<JobId> toBeCollected;
+ private boolean running;
+
public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
this.datasetManager = datasetManager;
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
toBeCollected = new ArrayList<JobId>();
+ running = true;
}
@Override
public void run() {
- while (true) {
+ while (running) {
try {
Thread.sleep(resultSweepThreshold);
sweep();
@@ -57,9 +60,14 @@
// There isn't much we can do really here
}
}
-
+ LOGGER.info("Result cleaner thread has stopped.");
}
+ public void close() {
+ running = false;
+ LOGGER.info("Result cleaner thread has been flagged to stop.");
+ }
+
private void sweep() {
synchronized (datasetManager) {
toBeCollected.clear();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b28ffaa 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,6 +50,8 @@
private final DatasetMemoryManager datasetMemoryManager;
+ private final ResultStateSweeper resultStateSweeper;
+
public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
long resultSweepThreshold) {
this.ncs = ncs;
@@ -62,7 +64,8 @@
datasetMemoryManager = null;
}
partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+ executor.execute(resultStateSweeper);
}
@Override
@@ -208,6 +211,7 @@
deinit(entry.getKey());
}
deallocatableRegistry.close();
+ resultStateSweeper.close();
}
@Override