Use the DistributeResultOperator for result delivery instead of squeezing in the functionality into WriteOperator.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1299 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
index 4af1424..d8477f1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/base/LogicalOperatorDeepCopyVisitor.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
@@ -60,7 +61,7 @@
 
     // Key: Variable in the original plan. Value: Variable with which to replace original variable in the plan copy.
     private final Map<LogicalVariable, LogicalVariable> inVarMapping;
-    
+
     public LogicalOperatorDeepCopyVisitor(Counter counter) {
         this.counter = counter;
         this.inVarMapping = Collections.emptyMap();
@@ -396,6 +397,11 @@
     }
 
     @Override
+    public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) {
         throw new UnsupportedOperationException();
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
index 762fa8f..23a7a18 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -126,6 +126,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -139,7 +140,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -203,7 +203,7 @@
             writeExprList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)));
             ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId(), resultNodeName);
             ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
-            topOp = new WriteOperator(writeExprList, sink);
+            topOp = new DistributeResultOperator(writeExprList, sink);
             topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
         } else {