Separate out WriteOperator and create a new DistributeResultOperator for result distribution.

We are creating this distinction to make sure that the old write results to a
file operator remains intact for those systems that use Algebricks but have
no need for result distribution like Hivestrix.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3053 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..b8bdf3e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,6 +20,7 @@
     CLUSTER,
     DATASOURCESCAN,
     DISTINCT,
+    DISTRIBUTE_RESULT,
     GROUP,
     EMPTYTUPLESOURCE,
     EXCHANGE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..1b4be1e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -47,8 +47,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -64,7 +66,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -476,6 +477,14 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
         // propagateFDsAndEquivClasses(op, ctx);
         setEmptyFDsEqClasses(op, ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index b97597d..ac6d887 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
@@ -425,6 +426,17 @@
     }
 
     @Override
+    public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+            return Boolean.FALSE;
+        DistributeResultOperator writeOpArg = (DistributeResultOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        return isomorphic;
+    }
+
+    @Override
     public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT)
@@ -762,6 +774,14 @@
         }
 
         @Override
+        public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
+                throws AlgebricksException {
+            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            deepCopyExpressionRefs(newExpressions, op.getExpressions());
+            return new DistributeResultOperator(newExpressions, op.getDataSink());
+        }
+
+        @Override
         public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
             ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
@@ -784,8 +804,8 @@
             deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
             List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(((AbstractLogicalExpression)op.getFilterExpression())
-                    .cloneExpression());
+            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
             return new IndexInsertDeleteOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
                     newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
         }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..b9544c7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -37,8 +37,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -54,7 +56,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -230,6 +231,13 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..8f1d686 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -29,8 +29,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -46,7 +48,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -240,6 +241,12 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 78b6801..31adcba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -32,8 +32,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,7 +51,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -223,6 +224,11 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         return null;

     }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a759e35..cd0cee3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -31,8 +31,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -238,6 +239,12 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        standardLayout(op);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         standardLayout(op);

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..69fb3f8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -33,8 +33,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -51,7 +53,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -335,6 +336,16 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

+            throws AlgebricksException {

+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {

+            e.getValue().substituteVar(pair.first, pair.second);

+        }

+        substVarTypes(op, pair);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

             throws AlgebricksException {

         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0ea9367..5361a19 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

@@ -304,6 +305,14 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {

+        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {

+            expr.getValue().getUsedVariables(usedVariables);

+        }

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {

         op.getPayloadExpression().getValue().getUsedVariables(usedVariables);

         for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index d835da4..302d4d2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -30,8 +30,8 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -65,7 +65,7 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        WriteOperator write = (WriteOperator) op;
+        DistributeResultOperator write = (DistributeResultOperator) op;
         IDataSink sink = write.getDataSink();
         IPartitioningProperty pp = sink.getPartitioningProperty();
         StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
@@ -77,7 +77,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        WriteOperator resultOp = (WriteOperator) op;
+        DistributeResultOperator resultOp = (DistributeResultOperator) op;
         IMetadataProvider mp = context.getMetadataProvider();
 
         JobSpecification spec = builder.getJobSpec();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..fc0c433 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -30,8 +30,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -164,6 +165,13 @@
     }
 
     @Override
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("distribute result ").append(op.getExpressions());
+        return buffer.toString();
+    }
+
+    @Override
     public String visitWriteResultOperator(WriteResultOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..23dac2a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -20,8 +20,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -37,7 +39,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -97,6 +98,8 @@
 
     public R visitWriteOperator(WriteOperator op, T arg) throws AlgebricksException;
 
+    public R visitDistributeResultOperator(DistributeResultOperator op, T arg) throws AlgebricksException;
+
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
     public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 666f01a..60a4fbb 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -242,6 +243,10 @@
                     break;
                 }
                 case WRITE: {
+                    op.setPhysicalOperator(new SinkWritePOperator());
+                    break;
+                }
+                case DISTRIBUTE_RESULT: {
                     op.setPhysicalOperator(new DistributeResultPOperator());
                     break;
                 }