Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-common/pom.xml b/algebricks/algebricks-common/pom.xml
index c93fe3e..a5677a5 100644
--- a/algebricks/algebricks-common/pom.xml
+++ b/algebricks/algebricks-common/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-compiler/pom.xml b/algebricks/algebricks-compiler/pom.xml
index f1f4521..8dc083d 100644
--- a/algebricks/algebricks-compiler/pom.xml
+++ b/algebricks/algebricks-compiler/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/pom.xml b/algebricks/algebricks-core/pom.xml
index 118ea57..a74a540 100644
--- a/algebricks/algebricks-core/pom.xml
+++ b/algebricks/algebricks-core/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 32cfb9a..a969372 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,7 +7,6 @@
     BTREE_SEARCH,
     STATS,
     DATASOURCE_SCAN,
-    DISTRIBUTE_RESULT,
     EMPTY_TUPLE_SOURCE,
     EXTERNAL_GROUP_BY,
     IN_MEMORY_HASH_JOIN,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 82187e3..899b633 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,10 +51,6 @@
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
             throws AlgebricksException;
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
-            JobSpecification spec) throws AlgebricksException;
-
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
deleted file mode 100644
index d835da4..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
-import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class DistributeResultPOperator extends AbstractPhysicalOperator {
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.DISTRIBUTE_RESULT;
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
-        WriteOperator write = (WriteOperator) op;
-        IDataSink sink = write.getDataSink();
-        IPartitioningProperty pp = sink.getPartitioningProperty();
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        WriteOperator resultOp = (WriteOperator) op;
-        IMetadataProvider mp = context.getMetadataProvider();
-
-        JobSpecification spec = builder.getJobSpec();
-
-        int[] columns = new int[resultOp.getExpressions().size()];
-        int i = 0;
-        for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
-            ILogicalExpression expr = exprRef.getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new NotImplementedException("Only writing variable expressions is supported.");
-            }
-            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-            LogicalVariable v = varRef.getVariableReference();
-            columns[i++] = inputSchemas[0].findVariable(v);
-        }
-        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
-                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-
-        IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
-                context, columns);
-
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
-
-        builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
-        ILogicalOperator src = resultOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, resultOp, 0);
-    }
-}
diff --git a/algebricks/algebricks-data/pom.xml b/algebricks/algebricks-data/pom.xml
index 3f7592a..3d927d9 100644
--- a/algebricks/algebricks-data/pom.xml
+++ b/algebricks/algebricks-data/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
deleted file mode 100644
index a6ebf01..0000000
--- a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.data;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public interface IResultSerializerFactoryProvider extends Serializable {
-    /**
-     * Returns a result serializer factory
-     * 
-     * @param fields
-     *            - A position of the fields in the order it should be written in the output.
-     * @param printerFactories
-     *            - A printer factory array to print the tuple containing different fields.
-     * @param writerFactory
-     *            - A writer factory to write the serialized data to the print stream.
-     * @param inputRecordDesc
-     *            - The record descriptor describing the input frame to be serialized.
-     * @return A new instance of result serialized appender.
-     */
-    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
-            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc);
-}
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index ea36fb6..954938a 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -16,8 +16,8 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
+					<source>1.6</source>
+					<target>1.6</target>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 0efe0ae..d678803 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -62,10 +62,13 @@
 
     @SuppressWarnings("unchecked")
     @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
-            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
-            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
-            throws AlgebricksException {
+	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+			IDataSource<String> dataSource,
+			List<LogicalVariable> scanVariables,
+			List<LogicalVariable> projectVariables, boolean projectPushed,
+			IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+			JobGenContext context, JobSpecification jobSpec)
+			throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -136,13 +139,6 @@
     }
 
     @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
-            JobSpecification spec) throws AlgebricksException {
-        return null;
-    }
-
-    @Override
     public IDataSourceIndex<String, String> findDataSourceIndex(String indexId, String dataSourceId)
             throws AlgebricksException {
         return null;
@@ -191,7 +187,7 @@
         // TODO Auto-generated method stub
         return null;
     }
-
+    
     @Override
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return FN_MAP.get(fid);
diff --git a/algebricks/algebricks-rewriter/pom.xml b/algebricks/algebricks-rewriter/pom.xml
index 448221d..41979d3 100644
--- a/algebricks/algebricks-rewriter/pom.xml
+++ b/algebricks/algebricks-rewriter/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 666f01a..38cf96e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -38,7 +38,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -51,6 +50,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -242,7 +242,7 @@
                     break;
                 }
                 case WRITE: {
-                    op.setPhysicalOperator(new DistributeResultPOperator());
+                    op.setPhysicalOperator(new SinkWritePOperator());
                     break;
                 }
                 case WRITE_RESULT: {
@@ -267,8 +267,8 @@
                     List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                     getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
                     getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
-                    op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, opInsDel
-                            .getFilterExpression(), opInsDel.getDataSourceIndex()));
+                    op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, 
+                    		opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
                     break;
                 }
                 case SINK: {
diff --git a/algebricks/algebricks-runtime/pom.xml b/algebricks/algebricks-runtime/pom.xml
index 70243d2..e40dfb0 100644
--- a/algebricks/algebricks-runtime/pom.xml
+++ b/algebricks/algebricks-runtime/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 28597a7..148f087 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -70,7 +70,6 @@
         for (int t = 0; t < nTuple; t++) {
             try {
                 writer.printTuple(tAccess, t);
-                printStream.println();
             } catch (AlgebricksException ae) {
                 throw new HyracksDataException(ae);
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
index 762e133..9c53241 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -47,6 +47,7 @@
                     }
                     printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
                 }
+                printStream.println();
             }
         };
     }
diff --git a/algebricks/algebricks-tests/pom.xml b/algebricks/algebricks-tests/pom.xml
index 228baa9..19e6711 100644
--- a/algebricks/algebricks-tests/pom.xml
+++ b/algebricks/algebricks-tests/pom.xml
@@ -16,8 +16,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
       <plugin>