[ASTERIXDB-2402][COMP] Allow AND for IsMissing in Groupby

AccessMethodUtils.findLOJIsMissingFuncInGroupBy()
looks for the not(is-missing($VAR)) pattern
in a group by, but failed to detect it if it is part of
an and, e.g. and(not(is-missing($VAR1)),not(is-missing($VAR2)))

This changes fixes this and adds a test case

Change-Id: I9547fba5e4ba02226b5b2c2504080b091d3b8d5e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2735
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index 6368058..728aef6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -64,6 +64,7 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -81,6 +82,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -1483,44 +1485,84 @@
         return createRectangleExpr;
     }
 
-    public static ScalarFunctionCallExpression findLOJIsMissingFuncInGroupBy(GroupByOperator lojGroupbyOp)
-            throws AlgebricksException {
-        //find IS_MISSING function of which argument has the nullPlaceholder variable in the nested plan of groupby.
-        ALogicalPlanImpl subPlan = (ALogicalPlanImpl) lojGroupbyOp.getNestedPlans().get(0);
-        Mutable<ILogicalOperator> subPlanRootOpRef = subPlan.getRoots().get(0);
-        AbstractLogicalOperator subPlanRootOp = (AbstractLogicalOperator) subPlanRootOpRef.getValue();
-        boolean foundSelectNonMissing = false;
-        ScalarFunctionCallExpression isMissingFuncExpr = null;
-        AbstractLogicalOperator inputOp = subPlanRootOp;
-        while (inputOp != null) {
-            if (inputOp.getOperatorTag() == LogicalOperatorTag.SELECT) {
-                SelectOperator selectOp = (SelectOperator) inputOp;
-                if (selectOp.getCondition().getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                    if (((AbstractFunctionCallExpression) selectOp.getCondition().getValue()).getFunctionIdentifier()
-                            .equals(AlgebricksBuiltinFunctions.NOT)) {
-                        ScalarFunctionCallExpression notFuncExpr =
-                                (ScalarFunctionCallExpression) selectOp.getCondition().getValue();
-                        if (notFuncExpr.getArguments().get(0).getValue()
-                                .getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                            if (((AbstractFunctionCallExpression) notFuncExpr.getArguments().get(0).getValue())
-                                    .getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.IS_MISSING)) {
-                                isMissingFuncExpr =
-                                        (ScalarFunctionCallExpression) notFuncExpr.getArguments().get(0).getValue();
-                                if (isMissingFuncExpr.getArguments().get(0).getValue()
-                                        .getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                    foundSelectNonMissing = true;
-                                    break;
-                                }
-                            }
+    private static ScalarFunctionCallExpression getNestedIsMissingCall(AbstractFunctionCallExpression call,
+            OptimizableOperatorSubTree rightSubTree) throws AlgebricksException {
+        ScalarFunctionCallExpression isMissingFuncExpr;
+        if (call.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.NOT)) {
+            if (call.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                if (((AbstractFunctionCallExpression) call.getArguments().get(0).getValue()).getFunctionIdentifier()
+                        .equals(AlgebricksBuiltinFunctions.IS_MISSING)) {
+                    isMissingFuncExpr = (ScalarFunctionCallExpression) call.getArguments().get(0).getValue();
+                    if (isMissingFuncExpr.getArguments().get(0).getValue()
+                            .getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                        LogicalVariable var =
+                                ((VariableReferenceExpression) isMissingFuncExpr.getArguments().get(0).getValue())
+                                        .getVariableReference();
+                        List<LogicalVariable> liveSubplanVars = new ArrayList<>();
+                        VariableUtilities.getSubplanLocalLiveVariables(rightSubTree.getRoot(), liveSubplanVars);
+                        if (liveSubplanVars.contains(var)) {
+                            return isMissingFuncExpr;
                         }
                     }
                 }
             }
-            inputOp = inputOp.getInputs().size() > 0 ? (AbstractLogicalOperator) inputOp.getInputs().get(0).getValue()
-                    : null;
         }
+        return null;
+    }
 
-        if (!foundSelectNonMissing) {
+    public static ScalarFunctionCallExpression findIsMissingInSubplan(AbstractLogicalOperator inputOp,
+            OptimizableOperatorSubTree rightSubTree) throws AlgebricksException {
+        ScalarFunctionCallExpression isMissingFuncExpr = null;
+        AbstractLogicalOperator currentOp = inputOp;
+        while (currentOp != null) {
+            if (currentOp.getOperatorTag() == LogicalOperatorTag.SELECT) {
+                SelectOperator selectOp = (SelectOperator) currentOp;
+                if (selectOp.getCondition().getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression call =
+                            (AbstractFunctionCallExpression) (selectOp).getCondition().getValue();
+                    if (call.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.AND)) {
+                        for (Mutable<ILogicalExpression> mexpr : call.getArguments()) {
+                            if (mexpr.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                                isMissingFuncExpr = getNestedIsMissingCall(
+                                        (AbstractFunctionCallExpression) mexpr.getValue(), rightSubTree);
+                                if (isMissingFuncExpr != null) {
+                                    return isMissingFuncExpr;
+                                }
+                            }
+                        }
+                    }
+                    isMissingFuncExpr = getNestedIsMissingCall(call, rightSubTree);
+                    if (isMissingFuncExpr != null) {
+                        return isMissingFuncExpr;
+                    }
+                }
+            } else if (currentOp.hasNestedPlans()) {
+                AbstractOperatorWithNestedPlans nestedPlanOp = (AbstractOperatorWithNestedPlans) currentOp;
+                for (ILogicalPlan nestedPlan : nestedPlanOp.getNestedPlans()) {
+                    for (Mutable<ILogicalOperator> root : nestedPlan.getRoots()) {
+                        isMissingFuncExpr =
+                                findIsMissingInSubplan((AbstractLogicalOperator) root.getValue(), rightSubTree);
+                        if (isMissingFuncExpr != null) {
+                            return isMissingFuncExpr;
+                        }
+                    }
+                }
+            }
+            currentOp = currentOp.getInputs().isEmpty() ? null
+                    : (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+        }
+        return isMissingFuncExpr;
+    }
+
+    public static ScalarFunctionCallExpression findLOJIsMissingFuncInGroupBy(GroupByOperator lojGroupbyOp,
+            OptimizableOperatorSubTree rightSubTree) throws AlgebricksException {
+        //find IS_MISSING function of which argument has the nullPlaceholder variable in the nested plan of groupby.
+        ALogicalPlanImpl subPlan = (ALogicalPlanImpl) lojGroupbyOp.getNestedPlans().get(0);
+        Mutable<ILogicalOperator> subPlanRootOpRef = subPlan.getRoots().get(0);
+        AbstractLogicalOperator subPlanRootOp = (AbstractLogicalOperator) subPlanRootOpRef.getValue();
+        ScalarFunctionCallExpression isMissingFuncExpr = findIsMissingInSubplan(subPlanRootOp, rightSubTree);
+
+        if (isMissingFuncExpr == null) {
             throw CompilationException.create(ErrorCode.CANNOT_FIND_NON_MISSING_SELECT_OPERATOR,
                     lojGroupbyOp.getSourceLocation());
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index bc73199..324668d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -380,8 +380,8 @@
                     // in GroupByOp.
                     if (isThisOpLeftOuterJoin && isParentOpGroupBy) {
                         analysisCtx.setLOJGroupbyOpRef(opRef);
-                        ScalarFunctionCallExpression isNullFuncExpr =
-                                AccessMethodUtils.findLOJIsMissingFuncInGroupBy((GroupByOperator) opRef.getValue());
+                        ScalarFunctionCallExpression isNullFuncExpr = AccessMethodUtils
+                                .findLOJIsMissingFuncInGroupBy((GroupByOperator) opRef.getValue(), rightSubTree);
                         analysisCtx.setLOJIsMissingFuncInGroupBy(isNullFuncExpr);
                     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-2402.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-2402.sqlpp
new file mode 100644
index 0000000..366540e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-2402.sqlpp
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string,
+  timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+  location: circle,
+  userName: string
+};
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle,
+  timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function EmergenciesNearMe(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r where r.timeStamp >
+   current_datetime() - day_time_duration("PT10S"))report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create type result as {
+  resultId:uuid
+};
+create type channelSub as {
+  channelSubId:uuid
+};
+create type brokerSub as {
+  channelSubId:uuid,
+  brokerSubId:uuid
+};
+create type broke as {
+  DataverseName: string,
+  BrokerName: string,
+  BrokerEndpoint: string
+};
+
+create dataset EmergenciesNearMeChannelResults(result) primary key resultId autogenerated;
+create dataset EmergenciesNearMeChannelChannelSubscriptions(channelSub) primary key channelSubId;
+create dataset EmergenciesNearMeChannelBrokerSubscriptions(brokerSub) primary key channelSubId,brokerSubId;
+create dataset Broker(broke) primary key DataverseName,BrokerName;
+
+
+
+SET inline_with "false";
+insert into channels.EmergenciesNearMeChannelResults as a (
+with channelExecutionTime as current_datetime()
+select result, channelExecutionTime, sub.channelSubId as channelSubId,current_datetime() as deliveryTime,
+(select b.BrokerEndPoint, bs.brokerSubId from
+channels.EmergenciesNearMeChannelBrokerSubscriptions bs,
+channels.Broker b
+where bs.BrokerName = b.BrokerName
+and bs.DataverseName = b.DataverseName
+and bs.channelSubId = sub.channelSubId
+) as brokerSubIds
+from channels.EmergenciesNearMeChannelChannelSubscriptions sub,
+channels.EmergenciesNearMe(sub.param0) result
+) returning
+(select
+a.channelExecutionTime, a.result, sub.BrokerEndpoint
+from (select sub from a.brokerSubIds sub) sub
+group by sub.BrokerEndpoint
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan
new file mode 100644
index 0000000..fced7a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-2402.plan
@@ -0,0 +1,197 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- SUBPLAN  |PARTITIONED|
+              {
+                -- AGGREGATE  |LOCAL|
+                  -- ASSIGN  |LOCAL|
+                    -- MICRO_PRE_CLUSTERED_GROUP_BY[$$183]  |LOCAL|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
+                      -- IN_MEMORY_STABLE_SORT [$$183(ASC)]  |LOCAL|
+                        -- ASSIGN  |LOCAL|
+                          -- UNNEST  |LOCAL|
+                            -- SUBPLAN  |LOCAL|
+                                    {
+                                      -- AGGREGATE  |LOCAL|
+                                        -- ASSIGN  |LOCAL|
+                                          -- UNNEST  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                    }
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+              }
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- COMMIT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- INSERT_DELETE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$167]  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- PRE_CLUSTERED_GROUP_BY[$$221]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- STREAM_SELECT  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- STABLE_SORT [$$221(ASC)]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$221]  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- HYBRID_HASH_JOIN [$$267][$$190]  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$267]  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- UNNEST  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- PRE_CLUSTERED_GROUP_BY[$$275]  |PARTITIONED|
+                                                                        {
+                                                                          -- AGGREGATE  |LOCAL|
+                                                                            -- MICRO_PRE_CLUSTERED_GROUP_BY[$$277, $$279]  |LOCAL|
+                                                                                    {
+                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                    }
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                        }
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- STABLE_SORT [$$275(ASC), $$277(ASC), $$279(ASC)]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$275]  |PARTITIONED|
+                                                                        -- UNION_ALL  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- BTREE_SEARCH  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- SPLIT  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- RTREE_SEARCH  |PARTITIONED|
+                                                                                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- STABLE_SORT [$$289(ASC)]  |PARTITIONED|
+                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                -- RTREE_SEARCH  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                                                    -- ASSIGN  |UNPARTITIONED|
+                                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                        -- STABLE_SORT [$$228(ASC)]  |PARTITIONED|
+                                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- SPLIT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- RTREE_SEARCH  |PARTITIONED|
+                                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- STABLE_SORT [$$289(ASC)]  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- RTREE_SEARCH  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- NESTED_LOOP  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                  -- STABLE_SORT [$$228(ASC)]  |PARTITIONED|
+                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$190]  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- HYBRID_HASH_JOIN [$$198, $$200][$$192, $$193]  |PARTITIONED|
+                                                              -- HASH_PARTITION_EXCHANGE [$$198, $$200]  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file