Refactor out the result state sweeper code.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index f010aa9..26f0b7a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -17,7 +17,7 @@
import java.util.HashMap;
import java.util.List;
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
public enum Status {
RUNNING,
SUCCESS,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index ab2df55..5380d60 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface IDatasetDirectoryService extends IJobLifecycleListener {
+public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
public void init(ExecutorService executor);
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
new file mode 100644
index 0000000..b6cd1af
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IDatasetManager {
+ public Map<JobId, IDatasetStateRecord> getStateMap();
+
+ public void deinitState(JobId jobId);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 1730165..0935211 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -20,7 +20,7 @@
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface IDatasetPartitionManager {
+public interface IDatasetPartitionManager extends IDatasetManager {
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
boolean asyncMode, int partition, int nPartitions) throws HyracksException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
new file mode 100644
index 0000000..1a400e3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+public interface IDatasetStateRecord {
+ public long getTimestamp();
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index d7b384b..a03d79f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,26 +14,25 @@
*/
package edu.uci.ics.hyracks.control.cc.dataset;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
/**
* TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -43,29 +42,27 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
- private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
-
private final long resultTTL;
private final long resultSweepThreshold;
- private final Map<JobId, DatasetJobRecord> jobResultLocations;
+ private final Map<JobId, IDatasetStateRecord> jobResultLocations;
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
- jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>();
+ jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
}
@Override
public void init(ExecutorService executor) {
- executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
throws HyracksException {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
jobResultLocations.put(jobId, djr);
@@ -85,7 +82,7 @@
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
int partition, int nPartitions, NetworkAddress networkAddress) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
@@ -106,7 +103,7 @@
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
int successCount = 0;
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
records[partition].writeEOS();
@@ -124,7 +121,7 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr != null) {
djr.fail();
}
@@ -133,7 +130,7 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr != null) {
djr.fail(exceptions);
}
@@ -143,7 +140,7 @@
@Override
public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
DatasetJobRecord djr;
- while ((djr = jobResultLocations.get(jobId)) == null) {
+ while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
try {
wait();
} catch (InterruptedException e) {
@@ -155,6 +152,16 @@
}
@Override
+ public Map<JobId, IDatasetStateRecord> getStateMap() {
+ return jobResultLocations;
+ }
+
+ @Override
+ public void deinitState(JobId jobId) {
+ jobResultLocations.remove(jobResultLocations.get(jobId));
+ }
+
+ @Override
public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
DatasetDirectoryRecord[] newRecords;
@@ -202,7 +209,7 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
+ DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
if (djr == null) {
throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
@@ -254,49 +261,4 @@
}
return null;
}
-
- class Sweeper implements Runnable {
- private final long resultTTL;
-
- private final long resultSweepThreshold;
-
- private final List<JobId> toBeCollected;
-
- public Sweeper(long resultTTL, long resultSweepThreshold) {
- this.resultTTL = resultTTL;
- this.resultSweepThreshold = resultSweepThreshold;
- toBeCollected = new ArrayList<JobId>();
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(resultSweepThreshold);
- sweep();
- } catch (InterruptedException e) {
- LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
- // There isn't much we can do really here
- }
- }
-
- }
-
- private void sweep() {
- synchronized (DatasetDirectoryService.this) {
- toBeCollected.clear();
- for (Map.Entry<JobId, DatasetJobRecord> entry : jobResultLocations.entrySet()) {
- if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
- toBeCollected.add(entry.getKey());
- }
- }
- for (JobId jobId : toBeCollected) {
- jobResultLocations.remove(jobId);
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Result state cleanup instance successfully completed.");
- }
- }
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
new file mode 100644
index 0000000..69b560c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * Sweeper to clean up the stale result distribution files and result states.
+ */
+public class ResultStateSweeper implements Runnable {
+ private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
+
+ private final IDatasetManager datasetManager;
+
+ private final long resultTTL;
+
+ private final long resultSweepThreshold;
+
+ private final List<JobId> toBeCollected;
+
+ public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+ this.datasetManager = datasetManager;
+ this.resultTTL = resultTTL;
+ this.resultSweepThreshold = resultSweepThreshold;
+ toBeCollected = new ArrayList<JobId>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(resultSweepThreshold);
+ sweep();
+ } catch (InterruptedException e) {
+ LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+ // There isn't much we can do really here
+ }
+ }
+
+ }
+
+ private void sweep() {
+ synchronized (datasetManager) {
+ toBeCollected.clear();
+ for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
+ if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+ toBeCollected.add(entry.getKey());
+ }
+ }
+ for (JobId jobId : toBeCollected) {
+ datasetManager.deinitState(jobId);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Result state cleanup instance successfully completed.");
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a0e78f8..1299593 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,23 +14,22 @@
*/
package edu.uci.ics.hyracks.control.nc.dataset;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -43,7 +42,7 @@
private final Executor executor;
- private final Map<JobId, ResultSetMap> partitionResultStateMap;
+ private final Map<JobId, IDatasetStateRecord> partitionResultStateMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -62,8 +61,8 @@
} else {
datasetMemoryManager = null;
}
- partitionResultStateMap = new LinkedHashMap<JobId, ResultSetMap>();
- executor.execute(new Sweeper(resultTTL, resultSweepThreshold));
+ partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
}
@Override
@@ -78,7 +77,7 @@
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
fileFactory);
- ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
rsIdMap = new ResultSetMap();
partitionResultStateMap.put(jobId, rsIdMap);
@@ -126,7 +125,7 @@
IFrameWriter writer) throws HyracksException {
ResultState resultState;
synchronized (this) {
- ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
throw new HyracksException("Unknown JobId " + jobId);
@@ -151,7 +150,7 @@
@Override
public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
- ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap != null) {
ResultState[] resultStates = rsIdMap.get(resultSetId);
if (resultStates != null) {
@@ -180,7 +179,7 @@
@Override
public synchronized void abortReader(JobId jobId) {
- ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
return;
@@ -205,14 +204,20 @@
@Override
public synchronized void close() {
- for (Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
- deinitState(entry);
+ for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
+ deinitState(entry.getKey());
}
deallocatableRegistry.close();
}
- private void deinitState(Entry<JobId, ResultSetMap> entry) {
- ResultSetMap rsIdMap = entry.getValue();
+ @Override
+ public Map<JobId, IDatasetStateRecord> getStateMap() {
+ return partitionResultStateMap;
+ }
+
+ @Override
+ public void deinitState(JobId jobId) {
+ ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.remove(jobId);
if (rsIdMap != null) {
for (ResultSetId rsId : rsIdMap.keySet()) {
ResultState[] resultStates = rsIdMap.get(rsId);
@@ -221,7 +226,7 @@
ResultState state = resultStates[i];
if (state != null) {
state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
}
}
}
@@ -229,7 +234,7 @@
}
}
- private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> {
+ private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
private static final long serialVersionUID = 1L;
long timestamp;
@@ -243,50 +248,4 @@
return timestamp;
}
}
-
- class Sweeper implements Runnable {
- private final long resultTTL;
-
- private final long resultSweepThreshold;
-
- private final List<JobId> toBeCollected;
-
- public Sweeper(long resultTTL, long resultSweepThreshold) {
- this.resultTTL = resultTTL;
- this.resultSweepThreshold = resultSweepThreshold;
- toBeCollected = new ArrayList<JobId>();
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(resultSweepThreshold);
- sweep();
- } catch (InterruptedException e) {
- LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
- // There isn't much we can do really here
- }
- }
-
- }
-
- private void sweep() {
- toBeCollected.clear();
- synchronized (DatasetPartitionManager.this) {
- for (Map.Entry<JobId, ResultSetMap> entry : partitionResultStateMap.entrySet()) {
- if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
- toBeCollected.add(entry.getKey());
- deinitState(entry);
- }
- }
- for (JobId jobId : toBeCollected) {
- partitionResultStateMap.remove(jobId);
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Result state cleanup instance successfully completed.");
- }
- }
- }
}