Implement job cleaner using the history property.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3271 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 371169f..52e6005 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -17,9 +17,10 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.JobId;
-public interface IDatasetDirectoryService {
+public interface IDatasetDirectoryService extends IJobLifecycleListener {
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
int nPartitions, NetworkAddress networkAddress);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5ecfe1d..506a870 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -205,6 +205,7 @@
private void startApplication() throws Exception {
appCtx = new CCApplicationContext(serverCtx, ccContext);
+ appCtx.addJobLifecycleListener(datasetDirectoryService);
String className = ccConfig.appCCMainClass;
if (className != null) {
Class<?> c = Class.forName(className);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index bd218eb..cdcdf4c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -26,6 +26,8 @@
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobId;
/**
@@ -46,13 +48,31 @@
return size() > jobHistorySize;
}
};
-;
+ }
+
+ @Override
+ public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
+ if (djr == null) {
+ djr = new DatasetJobRecord();
+ jobResultLocations.put(jobId, djr);
+ }
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId) throws HyracksException {
+ // Auto-generated method stub
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId) throws HyracksException {
+ // Auto-generated method stub
}
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
int partition, int nPartitions, NetworkAddress networkAddress) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
@@ -91,14 +111,14 @@
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
djr.fail();
notifyAll();
}
@Override
public synchronized void reportJobFailure(JobId jobId) {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
djr.fail();
notifyAll();
}
@@ -165,7 +185,11 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
+
+ if (djr == null) {
+ throw new HyracksDataException("Requested JobId " + jobId + "doesn't exist");
+ }
if (djr.getStatus() == Status.FAILED) {
throw new HyracksDataException("Job failed.");
@@ -208,13 +232,4 @@
}
return null;
}
-
- private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
- DatasetJobRecord djr = jobResultLocations.get(jobId);
- if (djr == null) {
- djr = new DatasetJobRecord();
- jobResultLocations.put(jobId, djr);
- }
- return djr;
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index cd58c3e..1e58b5c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -48,16 +48,22 @@
final int resultHistorySize) {
this.ncs = ncs;
this.executor = executor;
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ datasetMemoryManager = new DatasetMemoryManager(availableMemory);
partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
private static final long serialVersionUID = 1L;
protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
- return size() > resultHistorySize;
+ if (size() > resultHistorySize) {
+ for (ResultState state : eldest.getValue()) {
+ state.deinit();
+ }
+ return true;
+ }
+ return false;
}
};
- deallocatableRegistry = new DefaultDeallocatableRegistry();
- fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- datasetMemoryManager = new DatasetMemoryManager(availableMemory);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 3db3fd9..0f1d94c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -64,6 +64,10 @@
notifyAll();
}
+ public synchronized void deinit() {
+ fileRef.delete();
+ }
+
public ResultSetPartitionId getResultSetPartitionId() {
return resultSetPartitionId;
}