Fix for issues 838, 841, 527

These fixes help to recognize and rewrite plans that should use index lookups rather than nested loops or outer joins

Change-Id: Icad72c10e7c6afbc46cfb014c6198429d4e338e2
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/215
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index e6e610d..4e880585 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -91,6 +91,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceProjectsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.LeftOuterJoinToInnerJoinRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.MoveFreeVariableOperatorOutOfSubplanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.NestedSubplanToJoinRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushAssignBelowUnionAllRule;
@@ -137,6 +138,7 @@
         normalization.add(new ExtractGbyExpressionsRule());
         normalization.add(new ExtractDistinctByExpressionsRule());
         normalization.add(new ExtractOrderExpressionsRule());
+        normalization.add(new MoveFreeVariableOperatorOutOfSubplanRule());
 
         // IntroduceStaticTypeCastRule should go before
         // IntroduceDynamicTypeCastRule to
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 6e5cb02..7474e61 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -180,6 +180,7 @@
         if (gby.getNestedPlans().get(0).getRoots().size() != 1) {
             return false;
         }
+
         AbstractLogicalOperator nestedPlanRoot = (AbstractLogicalOperator) gby.getNestedPlans().get(0).getRoots()
                 .get(0).getValue();
         if (nestedPlanRoot.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
@@ -190,6 +191,7 @@
         if (agg.getVariables().size() > 1) {
             return false;
         }
+
         LogicalVariable aggVar = agg.getVariables().get(0);
         ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
         if (!aggVar.equals(unnestedVar)
@@ -217,6 +219,11 @@
 
         LogicalVariable posVar = unnest1.getPositionalVariable();
         if (posVar == null) {
+            ArrayList<ILogicalOperator> neededAssigns = new ArrayList<ILogicalOperator>();
+
+            if (agg.getInputs().get(0).getValue().getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                getNeededAssigns(agg.getInputs().get(0).getValue(), neededAssigns);
+            }
 
             // create assignment for group-by keys
             ArrayList<LogicalVariable> gbyKeyAssgnVars = new ArrayList<LogicalVariable>();
@@ -238,7 +245,15 @@
             }
 
             OrderOperator order = new OrderOperator(orderExprs);
-            order.getInputs().add(new MutableObject<ILogicalOperator>(gbyKeyAssign));
+
+            if (neededAssigns.size() < 1) {
+                order.getInputs().add(new MutableObject<ILogicalOperator>(gbyKeyAssign));
+            } else {
+                order.getInputs().add(new MutableObject<ILogicalOperator>(neededAssigns.get(0)));
+                neededAssigns.get(neededAssigns.size() - 1).getInputs().clear();
+                neededAssigns.get(neededAssigns.size() - 1).getInputs()
+                        .add(new MutableObject<ILogicalOperator>(gbyKeyAssign));
+            }
 
             opRef.setValue(assign);
             assign.getInputs().add(new MutableObject<ILogicalOperator>(order));
@@ -282,4 +297,11 @@
 
         return true;
     }
+
+    private void getNeededAssigns(ILogicalOperator assign, ArrayList<ILogicalOperator> assigns) {
+        assigns.add(assign);
+        if (assign.getInputs().get(0).getValue().getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            getNeededAssigns(assign.getInputs().get(0).getValue(), assigns);
+        }
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
index 412903a..4eaf738 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -191,7 +191,9 @@
         }
 
         List<Mutable<ILogicalExpression>> srcAssignExprRefs = new LinkedList<Mutable<ILogicalExpression>>();
-        fingAggFuncExprRef(assignOp.getExpressions(), aggVar, srcAssignExprRefs);
+        if (fingAggFuncExprRef(assignOp.getExpressions(), aggVar, srcAssignExprRefs) == false) {
+            return false;
+        }
         if (srcAssignExprRefs.isEmpty()) {
             return false;
         }
@@ -226,10 +228,17 @@
         return true;
     }
 
-    private void fingAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar,
+    private boolean fingAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar,
             List<Mutable<ILogicalExpression>> srcAssignExprRefs) {
         for (Mutable<ILogicalExpression> exprRef : exprRefs) {
             ILogicalExpression expr = exprRef.getValue();
+
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                if (((VariableReferenceExpression) expr).getVariableReference().equals(aggVar)) {
+                    return false;
+                }
+            }
+
             if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
                 continue;
             }
@@ -238,7 +247,9 @@
                     .getFunctionIdentifier());
             if (funcIdent == null) {
                 // Recursively look in func args.
-                fingAggFuncExprRef(funcExpr.getArguments(), aggVar, srcAssignExprRefs);
+                if (fingAggFuncExprRef(funcExpr.getArguments(), aggVar, srcAssignExprRefs) == false) {
+                    return false;
+                }
 
             } else {
                 // Check if this is the expr that uses aggVar.
@@ -249,5 +260,6 @@
                 }
             }
         }
+        return true;
     }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 3ae3235..39ff6eb 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -149,8 +149,8 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin)
-            throws AlgebricksException {
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin,
+            boolean hasGroupBy) throws AlgebricksException {
         AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinRef.getValue();
         Mutable<ILogicalExpression> conditionRef = joinOp.getCondition();
         // Determine if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
@@ -185,7 +185,7 @@
             return false;
         }
 
-        if (isLeftOuterJoin) {
+        if (isLeftOuterJoin && hasGroupBy) {
             //reset the null place holder variable
             AccessMethodUtils.resetLOJNullPlaceholderVariableInGroupByOp(analysisCtx, newNullPlaceHolderVar, context);
         }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
index 1419f33..36b87e4 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IAccessMethod.java
@@ -79,11 +79,13 @@
 
     /**
      * Applies the plan transformation to use chosenIndex to optimize a join query.
+     * In the case of a LeftOuterJoin, there may or may not be a needed groupby operation
+     * If there is, we will need to include it in the transformation
      */
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin)
-            throws AlgebricksException;
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin,
+            boolean hasGroupBy) throws AlgebricksException;
 
     /**
      * Analyzes expr to see whether it is optimizable by the given concrete index.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index 025c2e2..f113d97 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.asterix.optimizer.rules.am;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -58,11 +59,10 @@
  * 4. Choose an index to apply (for now only a single index will be chosen).
  * 5. Rewrite plan using index (delegated to IAccessMethods).
  * For left-outer-join, additional patterns are checked and additional treatment is needed as follows:
- * 1. Since left-outer-join operators always have groupby operator on top of it,
- * first it checks a pattern: (groupby) <-- (leftouterjoin)
+ * 1. First it checks if there is a groupByOp above the join: (groupby) <-- (leftouterjoin)
  * 2. Inherently, only the right-subtree of the lojOp can be used as indexSubtree.
  * So, the right-subtree must have at least one applicable index on join field(s)
- * 3. The null placeholder variable introduced in groupByOp should be taken care of correctly.
+ * 3. If there is a groupByOp, the null placeholder variable introduced in groupByOp should be taken care of correctly.
  * Here, the primary key variable from datasourceScanOp replaces the introduced null placeholder variable.
  * If the primary key is composite key, then the first variable of the primary key variables becomes the
  * null place holder variable. This null placeholder variable works for all three types of indexes.
@@ -75,6 +75,7 @@
     protected final OptimizableOperatorSubTree leftSubTree = new OptimizableOperatorSubTree();
     protected final OptimizableOperatorSubTree rightSubTree = new OptimizableOperatorSubTree();
     protected boolean isLeftOuterJoin = false;
+    protected boolean hasGroupBy = true;
 
     // Register access methods.
     protected static Map<FunctionIdentifier, List<IAccessMethod>> accessMethods = new HashMap<FunctionIdentifier, List<IAccessMethod>>();
@@ -129,6 +130,25 @@
         }
         pruneIndexCandidates(analyzedAMs);
 
+        //Remove possibly chosen indexes from left Tree
+        if (isLeftOuterJoin) {
+            Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
+            // Check applicability of indexes by access method type.
+            while (amIt.hasNext()) {
+                Map.Entry<IAccessMethod, AccessMethodAnalysisContext> entry = amIt.next();
+                AccessMethodAnalysisContext amCtx = entry.getValue();
+                Iterator<Map.Entry<Index, List<Integer>>> indexIt = amCtx.indexExprs.entrySet().iterator();
+                while (indexIt.hasNext()) {
+                    Map.Entry<Index, List<Integer>> indexEntry = indexIt.next();
+
+                    Index chosenIndex = indexEntry.getKey();
+                    if (!chosenIndex.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
+                        indexIt.remove();
+                    }
+                }
+            }
+        }
+
         // Choose index to be applied.
         Pair<IAccessMethod, Index> chosenIndex = chooseIndex(analyzedAMs);
         if (chosenIndex == null) {
@@ -139,16 +159,15 @@
         // Apply plan transformation using chosen index.
         AccessMethodAnalysisContext analysisCtx = analyzedAMs.get(chosenIndex.first);
 
-        //For LOJ, prepare objects to reset LOJ nullPlaceHolderVariable in GroupByOp 
-        if (isLeftOuterJoin) {
+        //For LOJ with GroupBy, prepare objects to reset LOJ nullPlaceHolderVariable in GroupByOp
+        if (isLeftOuterJoin && hasGroupBy) {
             analysisCtx.setLOJGroupbyOpRef(opRef);
             ScalarFunctionCallExpression isNullFuncExpr = AccessMethodUtils
                     .findLOJIsNullFuncInGroupBy((GroupByOperator) opRef.getValue());
             analysisCtx.setLOJIsNullFuncInGroupBy(isNullFuncExpr);
         }
-
         boolean res = chosenIndex.first.applyJoinPlanTransformation(joinRef, leftSubTree, rightSubTree,
-                chosenIndex.second, analysisCtx, context, isLeftOuterJoin);
+                chosenIndex.second, analysisCtx, context, isLeftOuterJoin, hasGroupBy);
         if (res) {
             OperatorPropertiesUtil.typeOpRec(opRef, context);
         }
@@ -195,8 +214,17 @@
     }
 
     private boolean isLeftOuterJoin(AbstractLogicalOperator op1) {
-        return (op1.getOperatorTag() == LogicalOperatorTag.GROUP && ((AbstractLogicalOperator) op1.getInputs().get(0)
-                .getValue()).getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN);
+        if (op1.getInputs().size() != 1) {
+            return false;
+        }
+        if (((AbstractLogicalOperator) op1.getInputs().get(0).getValue()).getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+            return false;
+        }
+        if (op1.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            return true;
+        }
+        hasGroupBy = false;
+        return true;
     }
 
     private boolean isInnerJoin(AbstractLogicalOperator op1) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 5042f80..7b3dc3c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -120,7 +120,7 @@
     @Override
     public boolean analyzeFuncExprArgs(AbstractFunctionCallExpression funcExpr,
             List<AbstractLogicalOperator> assignsAndUnnests, AccessMethodAnalysisContext analysisCtx) {
-        
+
         if (funcExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.CONTAINS) {
             boolean matches = AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
             if (!matches) {
@@ -418,8 +418,8 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin)
-            throws AlgebricksException {
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin,
+            boolean hasGroupBy) throws AlgebricksException {
         // Figure out if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
         Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
         // Determine probe and index subtrees based on chosen index.
@@ -451,7 +451,7 @@
 
         //if LOJ, reset null place holder variable
         LogicalVariable newNullPlaceHolderVar = null;
-        if (isLeftOuterJoin) {
+        if (isLeftOuterJoin && hasGroupBy) {
             //get a new null place holder variable that is the first field variable of the primary key 
             //from the indexSubTree's datasourceScanOp
             newNullPlaceHolderVar = indexSubTree.getDataSourceVariables().get(0);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index 1eb1f2f..ac4bf70 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -108,8 +108,8 @@
     @Override
     public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin)
-            throws AlgebricksException {
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean isLeftOuterJoin,
+            boolean hasGroupBy) throws AlgebricksException {
         // Determine if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
         Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
         // Determine probe and index subtrees based on chosen index.
@@ -144,7 +144,7 @@
             return false;
         }
 
-        if (isLeftOuterJoin) {
+        if (isLeftOuterJoin && hasGroupBy) {
             //reset the null place holder variable
             AccessMethodUtils.resetLOJNullPlaceholderVariableInGroupByOp(analysisCtx, newNullPlaceHolderVar, context);
         }
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/query-issue838.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/query-issue838.aql
new file mode 100644
index 0000000..24e4adf
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/query-issue838.aql
@@ -0,0 +1,45 @@
+/*
+ * Description  : This test case is to verify the fix for issue838
+ * https://code.google.com/p/asterixdb/issues/detail?id=838
+ * Expected Res : SUCCESS
+ * Date         : 18 Dec. 2014
+ */
+drop dataverse twitter if exists;
+create dataverse twitter;
+use dataverse twitter;
+
+create type TweetMessageType as closed {
+  tweetid: int64,
+  sender-location: point,
+  text: string
+}
+
+create type TweetHistorySubscription as open{
+  subscription-id: int32,
+  location: point
+}
+
+create dataset TweetHistorySubscriptions(TweetHistorySubscription)
+primary key subscription-id;
+create index testa on TweetHistorySubscriptions(location) type rtree;
+
+create dataset TweetMessages(TweetMessageType)
+primary key tweetid;
+create index locationIdx on TweetMessages(sender-location) type rtree;
+
+write output to nc1:"rttest/query-issue838.adm";
+
+for $sub in dataset TweetHistorySubscriptions
+let $location := $sub.location
+for $text in (
+  for $tweet in dataset TweetMessages
+    let $circle := create-circle($location,30.0)
+    where spatial-intersect($tweet.sender-location, $circle)
+    return $tweet
+)
+return {
+    "subscription-id":$sub.subscription-id,
+    "changeSet":1,
+    "execution-time":current-datetime(),
+    "message-text":$text
+}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/query-issue838.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/query-issue838.plan
new file mode 100644
index 0000000..34c71b9
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/query-issue838.plan
@@ -0,0 +1,26 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$18(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$18(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STABLE_SORT [$$29(ASC)]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- RTREE_SEARCH  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file