Teach result writer operator about the record descriptor.

Pass the record descriptor to the result writer operator constructor,
serialize and store it and also pass that while instantiating the
dataset partition writer.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2828 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 8c44494..dca58fc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -14,17 +14,20 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.result;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
@@ -35,10 +38,14 @@
 
     private final boolean ordered;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered) {
+    private final byte[] serializedRecordDescriptor;
+
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+            RecordDescriptor recordDescriptor) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
+        this.serializedRecordDescriptor = JavaSerializationUtils.serialize(recordDescriptor);
     }
 
     @Override
@@ -52,8 +59,8 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, ordered, rsId, partition,
-                            nPartitions);
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered,
+                            serializedRecordDescriptor, partition, nPartitions);
                     datasetPartitionWriter.open();
                 } catch (HyracksException e) {
                     throw new HyracksDataException(e);