Connect the whole get dataset directory service info and get result locations path from the client through the ClusterController by implementing the necessary RPC interfaces.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2477 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e34e60d..36e348b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -17,6 +17,7 @@
 import java.io.Serializable;
 import java.util.EnumSet;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -30,6 +31,8 @@
         CREATE_JOB,
         GET_JOB_STATUS,
         START_JOB,
+        GET_DATASET_DIRECTORY_SERIVICE_INFO,
+        GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO
     }
@@ -156,6 +159,50 @@
         }
     }
 
+    public static class GetDatasetDirectoryServiceInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetDatasetDirectoryServiceInfoFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetDatasetResultLocationsFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final NetworkAddress[] knownLocations;
+
+        public GetDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) {
+            this.jobId = jobId;
+            this.knownLocations = knownLocations;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_DATASET_RESULT_LOCATIONS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public NetworkAddress[] getKnownLocations() {
+            return knownLocations;
+        }
+    }
+
     public static class WaitForCompletionFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 4c06d42..ce6c2a3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -17,6 +17,7 @@
 import java.util.EnumSet;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -76,6 +77,13 @@
     }
 
     @Override
+    public NetworkAddress getDatasetDirectoryServiceInfo(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction(
+                jobId);
+        return (NetworkAddress) rpci.call(ipcHandle, gddsf);
+    }
+
+    @Override
     public void waitForCompletion(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
                 jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 227524c..7a59880 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,7 @@
 import org.apache.http.impl.client.DefaultHttpClient;
 
 import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -118,6 +119,10 @@
         return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
+    public NetworkAddress getDatasetDirectoryServiceInfo(JobId jobId) throws Exception {
+        return hci.getDatasetDirectoryServiceInfo(jobId);
+    }
+
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
         hci.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index bdbb544..2938ed0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -18,6 +18,7 @@
 import java.util.EnumSet;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -100,6 +101,16 @@
             throws Exception;
 
     /**
+     * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
+     * 
+     * @param jobId
+     *            JobId of the Job
+     * @return {@link NetworkAddress}
+     * @throws Exception
+     */
+    public NetworkAddress getDatasetDirectoryServiceInfo(JobId jobId) throws Exception;
+
+    /**
      * Waits until the specified job has completed, either successfully or has
      * encountered a permanent failure.
      * 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index ef5906e..22e3a1c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -17,6 +17,7 @@
 import java.util.EnumSet;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -35,6 +36,8 @@
 
     public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 
+    public NetworkAddress getDatasetDirectoryServiceInfo(JobId jobId) throws Exception;
+
     public void waitForCompletion(JobId jobId) throws Exception;
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 3fe2f6a..a85af25 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -268,6 +268,10 @@
         return clusterIPC;
     }
 
+    public NetworkAddress getDatasetDirectoryServiceInfo(JobId jobId) {
+        return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
+    }
+
     private class DeadNodeSweeper extends TimerTask {
         @Override
         public void run() {
@@ -325,6 +329,21 @@
                     return;
                 }
 
+                case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
+                    HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = (HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction) fn;
+                    workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this, gddsf
+                            .getJobId(), new IPCResponder<NetworkAddress>(handle, mid)));
+                    return;
+                }
+
+                case GET_DATASET_RESULT_LOCATIONS: {
+                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
+                            .getJobId(), gdrlf.getKnownLocations(), new IPCResponder<NetworkAddress[]>(handle, mid)));
+                    return;
+                }
+
                 case WAIT_FOR_COMPLETION: {
                     HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
                     workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),