Register the per application DatasetDirectoryService with the JobRun when the job starts.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2475 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 6f26de2..0fbc30b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -57,6 +58,8 @@
private final EnumSet<JobFlag> jobFlags;
+ private final IDatasetDirectoryService datasetDirectoryService;
+
private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
private final PartitionMatchMaker pmm;
@@ -84,13 +87,14 @@
private Exception pendingException;
public JobRun(ClusterControllerService ccs, JobId jobId, String applicationName,
- IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags, IDatasetDirectoryService dds) {
this.jobId = jobId;
this.applicationName = applicationName;
this.acgg = acgg;
this.acg = acgg.initialize();
this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
this.jobFlags = jobFlags;
+ this.datasetDirectoryService = dds;
activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
pmm = new PartitionMatchMaker();
participatingNodeIds = new HashSet<String>();
@@ -115,6 +119,10 @@
return jobFlags;
}
+ public IDatasetDirectoryService getDatasetDirectoryService() {
+ return datasetDirectoryService;
+ }
+
public Map<ActivityClusterId, ActivityClusterPlan> getActivityClusterPlanMap() {
return activityClusterPlanMap;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index b6a33cd..db7c3ff 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,6 +16,7 @@
import java.util.EnumSet;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -56,7 +57,8 @@
IActivityClusterGraphGeneratorFactory acggf = appCtx.createActivityClusterGraphGeneratorFactory(acggfBytes);
IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(appName, jobId, appCtx,
jobFlags);
- JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags);
+ IDatasetDirectoryService dds = appCtx.getDatasetDirectoryService();
+ JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags, dds);
run.setStatus(JobStatus.INITIALIZED, null);
ccs.getActiveRunMap().put(jobId, run);
appCtx.notifyJobCreation(jobId, acggf);