To be able to serialize the results on the server side obtain the printer factories for the results and pass it to the result handling metadataprovider run time.

In order to achieve this, provide the serializer factory provider to
Algebricks which in turn gets the serializer factory and passes it
on the actual operator. The operator runtime in turn uses the factory
to create the serializer object and it is used while generating the
results to serialize it.

We implement these interfaces in Asterix and for Hivestrix and
Pregelix just return null from the result handling runtime because
we don't handle result in those projects.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2975 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 1011920..82187e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -52,7 +52,8 @@
             throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException;
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index f78150a..d835da4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -14,13 +14,20 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
 
+import org.apache.commons.lang3.mutable.Mutable;
+
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
@@ -32,6 +39,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -73,11 +81,26 @@
         IMetadataProvider mp = context.getMetadataProvider();
 
         JobSpecification spec = builder.getJobSpec();
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
-                inputSchemas[0], context);
+
+        int[] columns = new int[resultOp.getExpressions().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new NotImplementedException("Only writing variable expressions is supported.");
+            }
+            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+            LogicalVariable v = varRef.getVariableReference();
+            columns[i++] = inputSchemas[0].findVariable(v);
+        }
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+                context, columns);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), recordDescriptor, false, spec);
+                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
 
         builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
         builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializedAppenderFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializedAppenderFactoryProvider.java
new file mode 100644
index 0000000..4b65822
--- /dev/null
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializedAppenderFactoryProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppenderFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IResultSerializedAppenderFactoryProvider extends Serializable {
+    /**
+     * Creates a result serialized appender
+     * 
+     * @param fields
+     *            - A position of the fields in the order it should be written in the output.
+     * @param printerFactories
+     *            - A printer factory array to print the tuple containing different fields.
+     * @param writerFactory
+     *            - A writer factory to write the serialized data to the print stream.
+     * @param inputRecordDesc
+     *            - The record descriptor describing the input frame to be serialized.
+     * @return A new instance of result serialized appender.
+     */
+    public IResultSerializedAppenderFactory getAqlResultAppenderFactoryProvider(int[] fields,
+            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc);
+}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f7dcaa0..0efe0ae 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -137,8 +137,8 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec)
-            throws AlgebricksException {
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException {
         return null;
     }
 
diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index cb4918d..6948769 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -32,124 +32,105 @@
 @SuppressWarnings("rawtypes")

 public class HiveMetaDataProvider<S, T> implements IMetadataProvider<S, T> {

 

-	private Operator fileSink;

-	private Schema outputSchema;

-	private HashMap<S, IDataSource<S>> dataSourceMap;

+    private Operator fileSink;

+    private Schema outputSchema;

+    private HashMap<S, IDataSource<S>> dataSourceMap;

 

-	public HiveMetaDataProvider(Operator fsOp, Schema oi,

-			HashMap<S, IDataSource<S>> map) {

-		fileSink = fsOp;

-		outputSchema = oi;

-		dataSourceMap = map;

-	}

-

-	@Override

-	public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId)

-			throws AlgebricksException {

-		return null;

-	}

-

-	@Override

-	public IDataSource<S> findDataSource(S id) throws AlgebricksException {

-		return dataSourceMap.get(id);

-	}

-

-	@Override

-	public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {

-		return true;

-	}

-

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(

-			IDataSource<S> dataSource, List<LogicalVariable> scanVariables,

-			List<LogicalVariable> projectVariables, boolean projectPushed,

-			IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,

-			JobGenContext context, JobSpecification jobSpec)

-			throws AlgebricksException {

-

-		S desc = dataSource.getId();

-		HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator(

-				(PartitionDesc) desc);

-		return generator.getRuntimeOperatorAndConstraint(dataSource,

-				scanVariables, projectVariables, projectPushed, context,

-				jobSpec);

-	}

-

-	@Override

-	public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(

-			IDataSink sink, int[] printColumns,

-			IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {

-

-		HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator(

-				(FileSinkOperator) fileSink, outputSchema);

-		return generator.getWriterRuntime(inputDesc);

-	}

-

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(

-			IDataSource<S> arg0, IOperatorSchema arg1,

-			List<LogicalVariable> arg2, LogicalVariable arg3,

-			RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)

-			throws AlgebricksException {

-		// TODO Auto-generated method stub

-		return null;

-	}

-

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(

-			IDataSource<S> arg0, IOperatorSchema arg1,

-			List<LogicalVariable> arg2, LogicalVariable arg3,

-			RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)

-			throws AlgebricksException {

-		// TODO Auto-generated method stub

-		return null;

-	}

-

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(

-			IDataSource<S> arg0, IOperatorSchema arg1,

-			List<LogicalVariable> arg2, LogicalVariable arg3,

-			JobGenContext arg4, JobSpecification arg5)

-			throws AlgebricksException {

-		// TODO Auto-generated method stub

-		return null;

-	}

+    public HiveMetaDataProvider(Operator fsOp, Schema oi, HashMap<S, IDataSource<S>> map) {

+        fileSink = fsOp;

+        outputSchema = oi;

+        dataSourceMap = map;

+    }

 

     @Override

-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

-            RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException {

+    public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId) throws AlgebricksException {

         return null;

     }

 

-   	@Override

-	public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {

-		return new HiveFunctionInfo(arg0, null);

-	}

+    @Override

+    public IDataSource<S> findDataSource(S id) throws AlgebricksException {

+        return dataSourceMap.get(id);

+    }

 

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(

-			IDataSourceIndex<T, S> dataSource,

-			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

-			IVariableTypeEnvironment typeEnv,

-			List<LogicalVariable> primaryKeys,

-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,

-			RecordDescriptor recordDesc, JobGenContext context,

-			JobSpecification spec) throws AlgebricksException {

-		// TODO Auto-generated method stub

-		return null;

-	}

+    @Override

+    public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {

+        return true;

+    }

 

-	@Override

-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(

-			IDataSourceIndex<T, S> dataSource,

-			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

-			IVariableTypeEnvironment typeEnv,

-			List<LogicalVariable> primaryKeys,

-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,

-			RecordDescriptor recordDesc, JobGenContext context,

-			JobSpecification spec) throws AlgebricksException {

-		// TODO Auto-generated method stub

-		return null;

-	}

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,

+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,

+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)

+            throws AlgebricksException {

+

+        S desc = dataSource.getId();

+        HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator((PartitionDesc) desc);

+        return generator.getRuntimeOperatorAndConstraint(dataSource, scanVariables, projectVariables, projectPushed,

+                context, jobSpec);

+    }

+

+    @Override

+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {

+

+        HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator((FileSinkOperator) fileSink, outputSchema);

+        return generator.getWriterRuntime(inputDesc);

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

+            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

+            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,

+            JobSpecification arg5) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

+            JobSpecification spec) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

+    public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {

+        return new HiveFunctionInfo(arg0, null);

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(

+            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

+            throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(

+            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

+            throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

 

 }

diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppender.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppender.java
new file mode 100644
index 0000000..1cfe02c
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppender.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IResultSerializedAppender extends Serializable {
+    /**
+     * Method to serialize the result and append it to the provided output stream
+     * 
+     * @param tAccess
+     *            - A frame tuple accessor object that contains the original data to be serialized
+     * @param tIdx
+     *            - Index of the tuple that should be serialized.
+     * @return true if the tuple was appended successfully, else false.
+     */
+    public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppenderFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppenderFactory.java
new file mode 100644
index 0000000..63d861b
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializedAppenderFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public interface IResultSerializedAppenderFactory extends Serializable {
+    /**
+     * Creates a result serialized appender
+     * 
+     * @param outputStream
+     *            - An output stream object to which the serialized results will be appended.
+     * @return A new instance of result serialized appender.
+     */
+    public IResultSerializedAppender createResultSerializer(OutputStream outputStream);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameOutputStream.java
new file mode 100644
index 0000000..73e5b74
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.comm.util;
+
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FrameOutputStream extends OutputStream {
+    private static final Logger LOGGER = Logger.getLogger(FrameOutputStream.class.getName());
+
+    private final FrameTupleAppender frameTupleAppender;
+
+    public FrameOutputStream(FrameTupleAppender frameTupleAppender) {
+        this.frameTupleAppender = frameTupleAppender;
+    }
+
+    @Override
+    public void write(int intByte) {
+        byte[] b = { (byte) intByte };
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(int): byte: " + b);
+        }
+        if (!frameTupleAppender.append(b, 0, 1)) {
+            throw new BufferOverflowException();
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[]): bytes: " + bytes);
+        }
+        if (!frameTupleAppender.append(bytes, 0, bytes.length)) {
+            throw new BufferOverflowException();
+        }
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("write(bytes[]): bytes: " + bytes);
+        }
+        if (!frameTupleAppender.append(bytes, offset, length)) {
+            throw new BufferOverflowException();
+        }
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index dca58fc..97814aa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,6 +21,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppender;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppenderFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -28,6 +30,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameOutputStream;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
@@ -38,13 +43,20 @@
 
     private final boolean ordered;
 
+    private final RecordDescriptor recordDescriptor;
+
+    private final IResultSerializedAppenderFactory resultSerializedAppenderFactory;
+
     private final byte[] serializedRecordDescriptor;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
-            RecordDescriptor recordDescriptor) throws IOException {
+            RecordDescriptor recordDescriptor, IResultSerializedAppenderFactory resultSerializedAppenderFactory)
+            throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
+        this.recordDescriptor = recordDescriptor;
+        this.resultSerializedAppenderFactory = resultSerializedAppenderFactory;
         this.serializedRecordDescriptor = JavaSerializationUtils.serialize(recordDescriptor);
     }
 
@@ -53,6 +65,19 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
         final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
 
+        final ByteBuffer tempBuffer = ctx.allocateFrame();
+        tempBuffer.clear();
+
+        final FrameTupleAppender frameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+        frameTupleAppender.reset(tempBuffer, true);
+
+        final FrameOutputStream frameOutputStream = new FrameOutputStream(frameTupleAppender);
+
+        final IResultSerializedAppender resultSerializedAppender = resultSerializedAppenderFactory
+                .createResultSerializer(frameOutputStream);
+
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             IFrameWriter datasetPartitionWriter;
 
@@ -69,7 +94,18 @@
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                datasetPartitionWriter.nextFrame(buffer);
+                frameTupleAccessor.reset(buffer);
+                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                    if (!resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex)) {
+                        datasetPartitionWriter.nextFrame(tempBuffer);
+                        frameTupleAppender.reset(tempBuffer, true);
+
+                        /* TODO(madhusudancs): This works under the assumption that no single JSON-ified record is
+                         * longer than the buffer size.
+                         */
+                        resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex);
+                    }
+                }
             }
 
             @Override
@@ -79,6 +115,10 @@
 
             @Override
             public void close() throws HyracksDataException {
+                if (frameTupleAppender.getTupleCount() > 0) {
+                    datasetPartitionWriter.nextFrame(tempBuffer);
+                    frameTupleAppender.reset(tempBuffer, true);
+                }
                 datasetPartitionWriter.close();
             }
         };
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index c6a0510..67ac57c 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -14,10 +14,9 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -32,7 +31,9 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppender;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppenderFactory;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -44,8 +45,6 @@
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public abstract class AbstractIntegrationTest {
     private static final Logger LOGGER = Logger.getLogger(AbstractIntegrationTest.class.getName());
@@ -134,43 +133,43 @@
 
         hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
         byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
-/*        FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
-                recordDescriptor);
+        /*        FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
+                        recordDescriptor);
 
-        ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream di = new DataInputStream(bbis);
+                ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
+                ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream di = new DataInputStream(bbis);
 
-        while (true) {
-            readBuffer.clear();
-            int size = hyracksDataset.read(readBuffer);
-            if (size <= 0) {
-                break;
-            }
-            try {
-                frameTupleAccessor.reset(readBuffer);
-                System.out.println("Tuple count: " + recordDescriptor);
-                for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                    int start = frameTupleAccessor.getTupleStartOffset(tIndex)
-                            + frameTupleAccessor.getFieldSlotsLength();
-                    bbis.setByteBuffer(readBuffer, start);
-                    Object[] record = new Object[recordDescriptor.getFieldCount()];
-                    for (int i = 0; i < record.length; ++i) {
-                        Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                        if (i == 0) {
-                            System.out.print(String.valueOf(instance));
-                        } else {
-                            System.out.print(", " + String.valueOf(instance));
-                        }
+                while (true) {
+                    readBuffer.clear();
+                    int size = hyracksDataset.read(readBuffer);
+                    if (size <= 0) {
+                        break;
                     }
-                    System.out.println();
+                    try {
+                        frameTupleAccessor.reset(readBuffer);
+                        System.out.println("Tuple count: " + recordDescriptor);
+                        for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                            int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+                                    + frameTupleAccessor.getFieldSlotsLength();
+                            bbis.setByteBuffer(readBuffer, start);
+                            Object[] record = new Object[recordDescriptor.getFieldCount()];
+                            for (int i = 0; i < record.length; ++i) {
+                                Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                                if (i == 0) {
+                                    System.out.print(String.valueOf(instance));
+                                } else {
+                                    System.out.print(", " + String.valueOf(instance));
+                                }
+                            }
+                            System.out.println();
+                        }
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        // End of my code
-*/
+                // End of my code
+        */
         hcc.waitForCompletion(jobId);
         dumpOutputFiles();
     }
@@ -200,4 +199,22 @@
         outputFiles.add(tempFile);
         return tempFile;
     }
+
+    protected IResultSerializedAppenderFactory getResultSerializedAppenderFactory() {
+        return new IResultSerializedAppenderFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IResultSerializedAppender createResultSerializer(final OutputStream frameOutputStream) {
+                return new IResultSerializedAppender() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+                        return true;
+                    }
+                };
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 18c5ee0..ab9605b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -75,7 +75,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc);
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc,
+                getResultSerializedAppenderFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -121,7 +122,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc);
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc,
+                getResultSerializedAppenderFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);