Instead of adding stop commands to the running thread, this patch changes the thread to be a daemon thread.
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 629ac5d..0463350 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -242,7 +242,6 @@
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
- datasetDirectoryService.stop();
executor.shutdownNow();
webServer.stop();
sweeper.cancel();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7b060c7..ee1cc67 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -19,7 +19,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -42,28 +41,21 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
- private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
-
private final long resultTTL;
private final long resultSweepThreshold;
private final Map<JobId, IDatasetStateRecord> jobResultLocations;
- private ResultStateSweeper resultStateSweeper;
-
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
- resultStateSweeper = null;
}
@Override
public void init(ExecutorService executor) {
- LOGGER.info("New ResultStateSweepser in DatasetDirectoryService.");
- resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
- executor.execute(resultStateSweeper);
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
@@ -268,9 +260,4 @@
}
return null;
}
-
- @Override
- public void stop() {
- resultStateSweeper.close();
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 05ed177..3de3c50 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -42,6 +42,4 @@
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
-
- public void stop();
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index a5ad630..69b560c 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -39,19 +39,16 @@
private final List<JobId> toBeCollected;
- private boolean running;
-
public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
this.datasetManager = datasetManager;
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
toBeCollected = new ArrayList<JobId>();
- running = true;
}
@Override
public void run() {
- while (running) {
+ while (true) {
try {
Thread.sleep(resultSweepThreshold);
sweep();
@@ -60,13 +57,8 @@
// There isn't much we can do really here
}
}
- LOGGER.info("Result cleaner thread has stopped.");
- }
- public void close() {
- running = false;
- LOGGER.info("Result cleaner thread has been flagged to stop.");
- }
+ }
private void sweep() {
synchronized (datasetManager) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b28ffaa..325e2bc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,8 +50,6 @@
private final DatasetMemoryManager datasetMemoryManager;
- private final ResultStateSweeper resultStateSweeper;
-
public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
long resultSweepThreshold) {
this.ncs = ncs;
@@ -64,8 +62,7 @@
datasetMemoryManager = null;
}
partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
- resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
- executor.execute(resultStateSweeper);
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
@@ -211,7 +208,6 @@
deinit(entry.getKey());
}
deallocatableRegistry.close();
- resultStateSweeper.close();
}
@Override