ResultWriterOperator should provide the record descriptor of its input buffers at run time to serializer and it shouldn't be statically provided to the factory provider at static query compilation time.

This is because the final output of the results being generated may be different than the
statically known record descriptor in some queries like aggregate queries, so we need
to dynamically provide the record descriptor at runtime.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3055 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
index a6ebf01..5e62612 100644
--- a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -16,10 +16,7 @@
 
 import java.io.Serializable;
 
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public interface IResultSerializerFactoryProvider extends Serializable {
     /**
@@ -36,5 +33,5 @@
      * @return A new instance of result serialized appender.
      */
     public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
-            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc);
+            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory);
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
index 92cf569..1fbf00f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
@@ -25,5 +25,5 @@
      *            - A print stream object to which the serialized results will be written.
      * @return A new instance of result serialized appender.
      */
-    public IResultSerializer createResultSerializer(PrintStream printStream);
+    public IResultSerializer createResultSerializer(RecordDescriptor recordDesc, PrintStream printStream);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index e500307..821f527 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -42,16 +42,13 @@
 
     private final boolean ordered;
 
-    private final RecordDescriptor recordDescriptor;
-
     private final IResultSerializerFactory resultSerializerFactory;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
-            RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
+            IResultSerializerFactory resultSerializerFactory) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
-        this.recordDescriptor = recordDescriptor;
         this.resultSerializerFactory = resultSerializerFactory;
     }
 
@@ -66,9 +63,11 @@
         frameOutputStream.reset(outputBuffer, true);
         PrintStream printStream = new PrintStream(frameOutputStream);
 
-        final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(printStream);
+        final RecordDescriptor outRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+        final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(outRecordDesc,
+                printStream);
 
-        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDesc);
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             IFrameWriter datasetPartitionWriter;