Replace the per job dataset directory service with a global directory service.

With this commit there will be only one directory service for entire Hyracks
and will be started when the cluster controller is started and all the jobs
share this directory service.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2481 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 9c4774c..3ff2c46 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -16,9 +16,12 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetDirectoryService {
-    public void registerResultPartitionLocation(int partition, int nPartitions, NetworkAddress networkAddress);
+    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+            NetworkAddress networkAddress);
 
-    public NetworkAddress[] getResultPartitionLocations(NetworkAddress[] knownLocations) throws HyracksDataException;
+    public NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
+            throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a85af25..94f7015 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -37,11 +37,13 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
@@ -119,6 +121,8 @@
 
     private final DeadNodeSweeper sweeper;
 
+    private final IDatasetDirectoryService datasetDirectoryService;
+
     private long jobCounter;
 
     public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -166,6 +170,7 @@
             }
         };
         sweeper = new DeadNodeSweeper();
+        datasetDirectoryService = new DatasetDirectoryService();
         jobCounter = 0;
     }
 
@@ -279,6 +284,10 @@
         }
     }
 
+    public IDatasetDirectoryService getDatasetDirectoryService() {
+        return datasetDirectoryService;
+    }
+
     private class HyracksClientInterfaceIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 101cd1c..24dfa7c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -24,13 +24,11 @@
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
 import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
-import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
@@ -46,15 +44,12 @@
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
 
-    private final IDatasetDirectoryService datasetDirectoryService;
-
     public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
         super(serverCtx, appName);
         this.ccContext = ccContext;
         initPendingNodeIds = new HashSet<String>();
         deinitPendingNodeIds = new HashSet<String>();
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
-        datasetDirectoryService = new DatasetDirectoryService();
     }
 
     @Override
@@ -137,8 +132,4 @@
     public void setDeinitializationCallback(IResultCallback<Object> deinitializationCallback) {
         this.deinitializationCallback = deinitializationCallback;
     }
-
-    public IDatasetDirectoryService getDatasetDirectoryService() {
-        return datasetDirectoryService;
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 402e430..ba9dfab 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -15,23 +15,28 @@
 package edu.uci.ics.hyracks.control.cc.dataset;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
 
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private NetworkAddress[] partitionLocations;
+    private final Map<JobId, NetworkAddress[]> jobPartitionLocationsMap;
 
     public DatasetDirectoryService() {
-        partitionLocations = null;
+        jobPartitionLocationsMap = new HashMap<JobId, NetworkAddress[]>();
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(int partition, int nPartitions,
+    public synchronized void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
             NetworkAddress networkAddress) {
+        NetworkAddress[] partitionLocations = jobPartitionLocationsMap.get(jobId);
         if (partitionLocations == null) {
             partitionLocations = new NetworkAddress[nPartitions];
+            jobPartitionLocationsMap.put(jobId, partitionLocations);
         }
 
         partitionLocations[partition] = networkAddress;
@@ -39,15 +44,15 @@
     }
 
     @Override
-    public synchronized NetworkAddress[] getResultPartitionLocations(NetworkAddress[] knownLocations)
+    public synchronized NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
             throws HyracksDataException {
-        while (Arrays.equals(partitionLocations, knownLocations)) {
+        while (Arrays.equals(jobPartitionLocationsMap.get(jobId), knownLocations)) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw new HyracksDataException(e);
             }
         }
-        return partitionLocations;
+        return jobPartitionLocationsMap.get(jobId);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 0fbc30b..6f26de2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -58,8 +57,6 @@
 
     private final EnumSet<JobFlag> jobFlags;
 
-    private final IDatasetDirectoryService datasetDirectoryService;
-
     private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
 
     private final PartitionMatchMaker pmm;
@@ -87,14 +84,13 @@
     private Exception pendingException;
 
     public JobRun(ClusterControllerService ccs, JobId jobId, String applicationName,
-            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags, IDatasetDirectoryService dds) {
+            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
         this.jobId = jobId;
         this.applicationName = applicationName;
         this.acgg = acgg;
         this.acg = acgg.initialize();
         this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
         this.jobFlags = jobFlags;
-        this.datasetDirectoryService = dds;
         activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
         pmm = new PartitionMatchMaker();
         participatingNodeIds = new HashSet<String>();
@@ -119,10 +115,6 @@
         return jobFlags;
     }
 
-    public IDatasetDirectoryService getDatasetDirectoryService() {
-        return datasetDirectoryService;
-    }
-
     public Map<ActivityClusterId, ActivityClusterPlan> getActivityClusterPlanMap() {
         return activityClusterPlanMap;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index a175126..814feb0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -19,7 +19,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
@@ -39,22 +38,12 @@
 
     @Override
     public void doRun() {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-        }
-        /* If run is not found even in the archives we simply return because we don't have the directory service to report
-         * to anymore.
-         */
-        if (run == null) {
-            return;
-        }
-        final IDatasetDirectoryService dds = run.getDatasetDirectoryService();
+        final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
         ccs.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 try {
-                    NetworkAddress[] partitionLocations = dds.getResultPartitionLocations(knownLocations);
+                    NetworkAddress[] partitionLocations = dds.getResultPartitionLocations(jobId, knownLocations);
                     callback.setValue(partitionLocations);
                 } catch (HyracksDataException e) {
                     callback.setException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index db7c3ff..b062d33 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -57,8 +57,7 @@
             IActivityClusterGraphGeneratorFactory acggf = appCtx.createActivityClusterGraphGeneratorFactory(acggfBytes);
             IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(appName, jobId, appCtx,
                     jobFlags);
-            IDatasetDirectoryService dds = appCtx.getDatasetDirectoryService();
-            JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags, dds);
+            JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags);
             run.setStatus(JobStatus.INITIALIZED, null);
             ccs.getActiveRunMap().put(jobId, run);
             appCtx.notifyJobCreation(jobId, acggf);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index a3f984a..e18572c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -17,7 +17,6 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class RegisterResultPartitionLocationWork extends AbstractWork {
@@ -38,17 +37,7 @@
 
     @Override
     public void run() {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
-        }
-        /* If run is not found even in the archives we simply return because we don't have the directory service to report
-         * to anymore.
-         */
-        if (run == null) {
-            return;
-        }
-        run.getDatasetDirectoryService().registerResultPartitionLocation(partition, nPartitions, networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, partition, nPartitions, networkAddress);
     }
 
     @Override