Implement the client side of RPCs to give the clients the ability to retrieve serialized record descriptor.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2831 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index abadc7e..f27a44e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,6 +33,7 @@
         GET_JOB_STATUS,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
+        GET_DATASET_RECORD_DESCRIPTOR,
         GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO
@@ -163,20 +164,36 @@
     public static class GetDatasetDirectoryServiceInfoFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
-
-        public GetDatasetDirectoryServiceInfoFunction(JobId jobId) {
-            this.jobId = jobId;
-        }
-
         @Override
         public FunctionId getFunctionId() {
             return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
         }
+    }
+
+    public static class GetDatasetRecordDescriptorFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        public GetDatasetRecordDescriptorFunction(JobId jobId, ResultSetId rsId) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_DATASET_RECORD_DESCRIPTOR;
+        }
 
         public JobId getJobId() {
             return jobId;
         }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
     }
 
     public static class GetDatasetResultLocationsFunction extends Function {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index f47e41b..b8b67bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -22,6 +22,8 @@
     public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
             int nPartitions, NetworkAddress networkAddress);
 
+    public byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index 29fabe2..ae04474 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -18,6 +18,18 @@
 
 public interface IHyracksDatasetDirectoryServiceConnection {
     /**
+     * Gets the record descriptor for the given result set.
+     * 
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @return {@link byte[]}
+     * @throws Exception
+     */
+    public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception;
+
+    /**
      * Gets the IP Addresses and ports for the partition generating the result for each location.
      * 
      * @param jobId
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index d1db448..8d536d6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -18,6 +18,18 @@
 
 public interface IHyracksDatasetDirectoryServiceInterface {
     /**
+     * Gets the record descriptor for the given result set.
+     * 
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @return {@link byte[]}
+     * @throws Exception
+     */
+    public byte[] getDatasetSerializedRecordDescriptorFunction(JobId jobId, ResultSetId rsId) throws Exception;
+
+    /**
      * Gets the IP Addresses and ports for the partition generating the result for each location.
      * 
      * @param jobId
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 1399dcc..1050983 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -40,6 +40,11 @@
     }
 
     @Override
+    public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception {
+        return ddsi.getDatasetSerializedRecordDescriptorFunction(jobId, rsId);
+    }
+
+    @Override
     public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws Exception {
         return ddsi.getDatasetResultLocationsFunction(jobId, rsId, knownRecords);
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index fb6e41f..9de26dd 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -34,6 +34,13 @@
     }
 
     @Override
+    public byte[] getDatasetSerializedRecordDescriptorFunction(JobId jobId, ResultSetId rsId) throws Exception {
+        HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = new HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction(
+                jobId, rsId);
+        return (byte[]) rpci.call(ipcHandle, gdrdf);
+    }
+
+    @Override
     public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8097ad4..4b61d09 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -56,6 +56,7 @@
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import edu.uci.ics.hyracks.control.cc.work.GetRecordDescriptorWork;
 import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
@@ -346,6 +347,13 @@
                     return;
                 }
 
+                case GET_DATASET_RECORD_DESCRIPTOR: {
+                    HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = (HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction) fn;
+                    workQueue.schedule(new GetRecordDescriptorWork(ClusterControllerService.this, gdrdf.getJobId(),
+                            gdrdf.getResultSetId(), new IPCResponder<byte[]>(handle, mid)));
+                    return;
+                }
+
                 case GET_DATASET_RESULT_LOCATIONS: {
                     HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
                     workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
new file mode 100644
index 0000000..02ad1d8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetRecordDescriptorWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final IResultCallback<byte[]> callback;
+
+    public GetRecordDescriptorWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            IResultCallback<byte[]> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.rsId = rsId;
+        this.callback = callback;
+    }
+
+    @Override
+    public void doRun() {
+        final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
+        ccs.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    byte[] recDesc = dds.getRecordDescriptor(jobId, rsId);
+                    callback.setValue(recDesc);
+                } catch (HyracksDataException e) {
+                    callback.setException(e);
+                }
+            }
+        });
+    }
+}
\ No newline at end of file