Merged fullstack_asterix_stabilization -r 2933:3157

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-common/pom.xml b/algebricks/algebricks-common/pom.xml
index a5677a5..521ef12 100644
--- a/algebricks/algebricks-common/pom.xml
+++ b/algebricks/algebricks-common/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-compiler/pom.xml b/algebricks/algebricks-compiler/pom.xml
index 8dc083d..bd35835 100644
--- a/algebricks/algebricks-compiler/pom.xml
+++ b/algebricks/algebricks-compiler/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/pom.xml b/algebricks/algebricks-core/pom.xml
index a74a540..def5b35 100644
--- a/algebricks/algebricks-core/pom.xml
+++ b/algebricks/algebricks-core/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..b8bdf3e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,6 +20,7 @@
     CLUSTER,
     DATASOURCESCAN,
     DISTINCT,
+    DISTRIBUTE_RESULT,
     GROUP,
     EMPTYTUPLESOURCE,
     EXCHANGE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..32cfb9a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,6 +7,7 @@
     BTREE_SEARCH,
     STATS,
     DATASOURCE_SCAN,
+    DISTRIBUTE_RESULT,
     EMPTY_TUPLE_SOURCE,
     EXTERNAL_GROUP_BY,
     IN_MEMORY_HASH_JOIN,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 899b633..82187e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,6 +51,10 @@
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
             throws AlgebricksException;
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException;
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
new file mode 100644
index 0000000..6ca6d87
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class DistributeResultOperator extends AbstractLogicalOperator {
+    private List<Mutable<ILogicalExpression>> expressions;
+    private IDataSink dataSink;
+
+    public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
+        this.expressions = expressions;
+        this.dataSink = dataSink;
+    }
+
+    public List<Mutable<ILogicalExpression>> getExpressions() {
+        return expressions;
+    }
+
+    public IDataSink getDataSink() {
+        return dataSink;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.DISTRIBUTE_RESULT;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitDistributeResultOperator(this, arg);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        boolean modif = false;
+        for (int i = 0; i < expressions.size(); i++) {
+            boolean b = visitor.transform(expressions.get(i));
+            if (b) {
+                modif = true;
+            }
+        }
+        return modif;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean isMap() {
+        return false; // actually depends on the physical op.
+    }
+
+    @Override
+    public void recomputeSchema() {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..1b4be1e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -47,8 +47,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -64,7 +66,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -476,6 +477,14 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
         // propagateFDsAndEquivClasses(op, ctx);
         setEmptyFDsEqClasses(op, ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index b97597d..ac6d887 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
@@ -425,6 +426,17 @@
     }
 
     @Override
+    public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+            return Boolean.FALSE;
+        DistributeResultOperator writeOpArg = (DistributeResultOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        return isomorphic;
+    }
+
+    @Override
     public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT)
@@ -762,6 +774,14 @@
         }
 
         @Override
+        public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
+                throws AlgebricksException {
+            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            deepCopyExpressionRefs(newExpressions, op.getExpressions());
+            return new DistributeResultOperator(newExpressions, op.getDataSink());
+        }
+
+        @Override
         public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
             ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
@@ -784,8 +804,8 @@
             deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
             List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(((AbstractLogicalExpression)op.getFilterExpression())
-                    .cloneExpression());
+            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
             return new IndexInsertDeleteOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
                     newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
         }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..b9544c7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -37,8 +37,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -54,7 +56,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -230,6 +231,13 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..8f1d686 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -29,8 +29,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -46,7 +48,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -240,6 +241,12 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 78b6801..31adcba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -32,8 +32,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,7 +51,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -223,6 +224,11 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         return null;

     }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a759e35..cd0cee3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -31,8 +31,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -238,6 +239,12 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        standardLayout(op);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         standardLayout(op);

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..69fb3f8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -33,8 +33,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -51,7 +53,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -335,6 +336,16 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

+            throws AlgebricksException {

+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {

+            e.getValue().substituteVar(pair.first, pair.second);

+        }

+        substVarTypes(op, pair);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

             throws AlgebricksException {

         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0ea9367..5361a19 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

@@ -304,6 +305,14 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {

+        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {

+            expr.getValue().getUsedVariables(usedVariables);

+        }

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {

         op.getPayloadExpression().getValue().getUsedVariables(usedVariables);

         for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
new file mode 100644
index 0000000..302d4d2
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class DistributeResultPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.DISTRIBUTE_RESULT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        DistributeResultOperator write = (DistributeResultOperator) op;
+        IDataSink sink = write.getDataSink();
+        IPartitioningProperty pp = sink.getPartitioningProperty();
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        DistributeResultOperator resultOp = (DistributeResultOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+
+        JobSpecification spec = builder.getJobSpec();
+
+        int[] columns = new int[resultOp.getExpressions().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new NotImplementedException("Only writing variable expressions is supported.");
+            }
+            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+            LogicalVariable v = varRef.getVariableReference();
+            columns[i++] = inputSchemas[0].findVariable(v);
+        }
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+                context, columns);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+
+        builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = resultOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, resultOp, 0);
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..fc0c433 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -30,8 +30,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -164,6 +165,13 @@
     }
 
     @Override
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("distribute result ").append(op.getExpressions());
+        return buffer.toString();
+    }
+
+    @Override
     public String visitWriteResultOperator(WriteResultOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..23dac2a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -20,8 +20,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -37,7 +39,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -97,6 +98,8 @@
 
     public R visitWriteOperator(WriteOperator op, T arg) throws AlgebricksException;
 
+    public R visitDistributeResultOperator(DistributeResultOperator op, T arg) throws AlgebricksException;
+
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
     public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
diff --git a/algebricks/algebricks-data/pom.xml b/algebricks/algebricks-data/pom.xml
index 3d927d9..9536416 100644
--- a/algebricks/algebricks-data/pom.xml
+++ b/algebricks/algebricks-data/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..5e62612
--- /dev/null
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+
+public interface IResultSerializerFactoryProvider extends Serializable {
+    /**
+     * Returns a result serializer factory
+     * 
+     * @param fields
+     *            - A position of the fields in the order it should be written in the output.
+     * @param printerFactories
+     *            - A printer factory array to print the tuple containing different fields.
+     * @param writerFactory
+     *            - A writer factory to write the serialized data to the print stream.
+     * @param inputRecordDesc
+     *            - The record descriptor describing the input frame to be serialized.
+     * @return A new instance of result serialized appender.
+     */
+    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
+            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory);
+}
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index 954938a..ca7467b 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -16,8 +16,9 @@
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
 				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
+					<source>1.7</source>
+					<target>1.7</target>
+					<fork>true</fork>
 				</configuration>
 			</plugin>
 			<plugin>
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index d105759..2981157 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -127,9 +127,9 @@
             }
         });
         builder.setTypeTraitProvider(new ITypeTraitProvider() {
-			public ITypeTraits getTypeTrait(Object type) {
-				return null;
-			}
+            public ITypeTraits getTypeTrait(Object type) {
+                return null;
+            }
         });
         builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE);
         builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index d678803..15b290e 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -145,6 +145,13 @@
     }
 
     @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
             IDataSource<String> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
diff --git a/algebricks/algebricks-rewriter/pom.xml b/algebricks/algebricks-rewriter/pom.xml
index 41979d3..7968773 100644
--- a/algebricks/algebricks-rewriter/pom.xml
+++ b/algebricks/algebricks-rewriter/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 38cf96e..60a4fbb 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -245,6 +246,10 @@
                     op.setPhysicalOperator(new SinkWritePOperator());
                     break;
                 }
+                case DISTRIBUTE_RESULT: {
+                    op.setPhysicalOperator(new DistributeResultPOperator());
+                    break;
+                }
                 case WRITE_RESULT: {
                     WriteResultOperator opLoad = (WriteResultOperator) op;
                     LogicalVariable payload;
@@ -267,8 +272,8 @@
                     List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                     getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
                     getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
-                    op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, 
-                    		opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                    op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, opInsDel
+                            .getFilterExpression(), opInsDel.getDataSourceIndex()));
                     break;
                 }
                 case SINK: {
diff --git a/algebricks/algebricks-runtime/pom.xml b/algebricks/algebricks-runtime/pom.xml
index e40dfb0..e438283 100644
--- a/algebricks/algebricks-runtime/pom.xml
+++ b/algebricks/algebricks-runtime/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..4f28c81
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.serializer;
+
+import java.io.PrintStream;
+import java.nio.BufferOverflowException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ResultSerializerFactoryProvider implements IResultSerializerFactoryProvider {
+    private static final long serialVersionUID = 1L;
+
+    public static final ResultSerializerFactoryProvider INSTANCE = new ResultSerializerFactoryProvider();
+
+    private ResultSerializerFactoryProvider() {
+    }
+
+    @Override
+    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+            final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory) {
+        return new IResultSerializerFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IResultSerializer createResultSerializer(RecordDescriptor inputRecordDesc, PrintStream printStream) {
+                final IAWriter writer = writerFactory.createWriter(fields, printStream, printerFactories,
+                        inputRecordDesc);
+
+                return new IResultSerializer() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public void init() throws HyracksDataException {
+                        try {
+                            writer.init();
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+
+                    @Override
+                    public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+                        try {
+                            writer.printTuple(tAccess, tIdx);
+                        } catch (BufferOverflowException e) {
+                            return false;
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        return true;
+                    }
+                };
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-tests/pom.xml b/algebricks/algebricks-tests/pom.xml
index 19e6711..c114881 100644
--- a/algebricks/algebricks-tests/pom.xml
+++ b/algebricks/algebricks-tests/pom.xml
@@ -16,9 +16,11 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
-        </configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
+	  <encoding>UTF-8</encoding>
+	</configuration>
       </plugin>
       <plugin>
         <artifactId>maven-antrun-plugin</artifactId>
diff --git a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 1807dd1..2855eb2 100644
--- a/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -55,6 +55,7 @@
         ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -64,6 +65,7 @@
         ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
         ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
@@ -84,4 +86,4 @@
         hcc.waitForCompletion(jobId);
     }
 
-}
\ No newline at end of file
+}