Pass the serialized record descriptor all the way upto dataset directory service in the RPC chain.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2829 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4c36f27..bc67146 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -25,9 +25,11 @@
 
     private final JobId jobId;
 
+    private final ResultSetId rsId;
+
     private final boolean orderedResult;
 
-    private final ResultSetId rsId;
+    private final byte[] serializedRecordDescriptor;
 
     private final int partition;
 
@@ -35,12 +37,14 @@
 
     private final NetworkAddress networkAddress;
 
-    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, boolean orderedResult,
-            ResultSetId rsId, int partition, int nPartitions, NetworkAddress networkAddress) {
+    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            boolean orderedResult, byte[] serializedRecordDescriptor, int partition, int nPartitions,
+            NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
-        this.orderedResult = orderedResult;
         this.rsId = rsId;
+        this.orderedResult = orderedResult;
+        this.serializedRecordDescriptor = serializedRecordDescriptor;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.networkAddress = networkAddress;
@@ -48,8 +52,8 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
-                nPartitions, networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult,
+                serializedRecordDescriptor, partition, nPartitions, networkAddress);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a79110d..b9dd665 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -48,8 +48,9 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
-            int nPartitions, NetworkAddress addr) throws Exception;
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress)
+            throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 2d48155..5d84683 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -445,9 +445,11 @@
 
         private final JobId jobId;
 
+        private final ResultSetId rsId;
+
         private final boolean orderedResult;
 
-        private final ResultSetId rsId;
+        private final byte[] serializedRecordDescriptor;
 
         private final int partition;
 
@@ -455,11 +457,12 @@
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, boolean orderedResult, ResultSetId rsId,
-                int partition, int nPartitions, NetworkAddress networkAddress) {
+        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+                byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
             this.jobId = jobId;
-            this.orderedResult = orderedResult;
             this.rsId = rsId;
+            this.orderedResult = orderedResult;
+            this.serializedRecordDescriptor = serializedRecordDescriptor;
             this.partition = partition;
             this.nPartitions = nPartitions;
             this.networkAddress = networkAddress;
@@ -474,14 +477,18 @@
             return jobId;
         }
 
-        public boolean getOrderedResult() {
-            return orderedResult;
-        }
-
         public ResultSetId getResultSetId() {
             return rsId;
         }
 
+        public byte[] getSerializedRecordDescriptor() {
+            return serializedRecordDescriptor;
+        }
+
+        public boolean getOrderedResult() {
+            return orderedResult;
+        }
+
         public int getPartition() {
             return partition;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ad3a82e..a5efcff 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -97,10 +97,11 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
-            int nPartitions, NetworkAddress networkAddress) throws Exception {
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress)
+            throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, orderedResult, rsId, partition, nPartitions, networkAddress);
+                jobId, rsId, orderedResult, serializedRecordDescriptor, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b9a8b69..ed8a278 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,13 +50,14 @@
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
-            int partition, int nPartitions) throws HyracksException {
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+            byte[] serializedRecordDescriptor, int partition, int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
-                    nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult,
+                    serializedRecordDescriptor, partition, nPartitions,
+                    ncs.getDatasetNetworkManager().getNetworkAddress());
             dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
 
             DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);