Refactor ResultWriterOperatorDescriptor to use the new interface names and use the rewritten FrameOutput with different arguments to be passed.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2980 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 66b8fbd..7518449 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -15,24 +15,23 @@
 package edu.uci.ics.hyracks.dataflow.std.result;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppender;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppenderFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameOutputStream;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
@@ -45,8 +44,7 @@
 
     private final RecordDescriptor recordDescriptor;
 
-    private final IResultSerializedAppenderFactory resultSerializedAppenderFactory;
-
+    private final IResultSerializerFactory resultSerializerFactory;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
             RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
@@ -54,7 +52,7 @@
         this.rsId = rsId;
         this.ordered = ordered;
         this.recordDescriptor = recordDescriptor;
-        this.resultSerializedAppenderFactory = resultSerializedAppenderFactory;
+        this.resultSerializerFactory = resultSerializerFactory;
     }
 
     @Override
@@ -62,16 +60,13 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
         final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
 
-        final ByteBuffer tempBuffer = ctx.allocateFrame();
-        tempBuffer.clear();
+        final ByteBuffer outputBuffer = ctx.allocateFrame();
 
-        final FrameTupleAppender frameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
-        frameTupleAppender.reset(tempBuffer, true);
+        final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
+        frameOutputStream.reset(outputBuffer, true);
+        PrintStream printStream = new PrintStream(frameOutputStream);
 
-        final FrameOutputStream frameOutputStream = new FrameOutputStream(frameTupleAppender);
-
-        final IResultSerializedAppender resultSerializedAppender = resultSerializedAppenderFactory
-                .createResultSerializer(frameOutputStream);
+        final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(printStream);
 
         final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
 
@@ -92,14 +87,16 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 frameTupleAccessor.reset(buffer);
                 for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                    if (!resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex)) {
-                        datasetPartitionWriter.nextFrame(tempBuffer);
-                        frameTupleAppender.reset(tempBuffer, true);
+                    resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+                    if (!frameOutputStream.appendTuple()) {
+                        datasetPartitionWriter.nextFrame(outputBuffer);
+                        frameOutputStream.reset(outputBuffer, true);
 
-                        /* TODO(madhusudancs): This works under the assumption that no single JSON-ified record is
+                        /* TODO(madhusudancs): This works under the assumption that no single serialized record is
                          * longer than the buffer size.
                          */
-                        resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex);
+                        resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+                        frameOutputStream.appendTuple();
                     }
                 }
             }
@@ -111,9 +108,9 @@
 
             @Override
             public void close() throws HyracksDataException {
-                if (frameTupleAppender.getTupleCount() > 0) {
-                    datasetPartitionWriter.nextFrame(tempBuffer);
-                    frameTupleAppender.reset(tempBuffer, true);
+                if (frameOutputStream.getTupleCount() > 0) {
+                    datasetPartitionWriter.nextFrame(outputBuffer);
+                    frameOutputStream.reset(outputBuffer, true);
                 }
                 datasetPartitionWriter.close();
             }