[ASTERIXDB-3008][COMP] Improve translation of inner joins in subplans

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix performance regression caused by ASTERIXDB-3006
- Fix incorrect free variable computation by
  FreeVariableVisitor for join clauses

Change-Id: I1f9d0f453202ec79673f2f66b9034fbc6047212b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15127
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 1d226dd..ea80ea6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -18,11 +18,9 @@
  */
 package org.apache.asterix.translator;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -125,6 +123,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -155,7 +154,6 @@
     public static final String REWRITE_IN_AS_OR_OPTION = "rewrite_in_as_or";
     private static final boolean REWRITE_IN_AS_OR_OPTION_DEFAULT = true;
 
-    private Deque<Mutable<ILogicalOperator>> uncorrelatedRightBranchStack = new ArrayDeque<>();
     private final Map<VarIdentifier, IAObject> externalVars;
     private final boolean translateInAsOr;
 
@@ -303,12 +301,10 @@
             throws CompilationException {
         Mutable<ILogicalOperator> inputSrc = arg;
         Pair<ILogicalOperator, LogicalVariable> topUnnest = null;
-        uncorrelatedRightBranchStack.push(inputSrc);
         for (FromTerm fromTerm : fromClause.getFromTerms()) {
             topUnnest = fromTerm.accept(this, inputSrc);
             inputSrc = new MutableObject<>(topUnnest.first);
         }
-        uncorrelatedRightBranchStack.pop();
         return topUnnest;
     }
 
@@ -345,8 +341,10 @@
     public Pair<ILogicalOperator, LogicalVariable> visit(JoinClause joinClause, Mutable<ILogicalOperator> leftInputRef)
             throws CompilationException {
         SourceLocation sourceLoc = joinClause.getSourceLocation();
-        if (joinClause.getJoinType() == JoinType.INNER && !context.inSubplan()) {
-            Mutable<ILogicalOperator> rightInputRef = uncorrelatedRightBranchStack.peek();
+        if (joinClause.getJoinType() == JoinType.INNER && !hasFreeVariables(joinClause.getRightExpression())) {
+            EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+            ets.setSourceLocation(joinClause.getSourceLocation());
+            Mutable<ILogicalOperator> rightInputRef = new MutableObject<>(ets);
             Pair<ILogicalOperator, LogicalVariable> rightBranch =
                     generateUnnestForBinaryCorrelateRightBranch(joinClause, rightInputRef, false, null);
             // A join operator with condition TRUE.
@@ -509,6 +507,16 @@
         }
     }
 
+    private boolean hasFreeVariables(Expression expr) throws CompilationException {
+        Set<VariableExpr> freeVars = SqlppRewriteUtil.getFreeVariable(expr);
+        for (VariableExpr varRef : freeVars) {
+            if (!SqlppVariableUtil.isExternalVariableReference(varRef)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static IAlgebricksConstantValue translateLeftOuterMissingValue(Literal.Type type)
             throws CompilationException {
         switch (type) {
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ch2/ch2_q8_subquery.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ch2/ch2_q8_subquery.sqlpp
new file mode 100644
index 0000000..3b31cf7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ch2/ch2_q8_subquery.sqlpp
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test plan for CH2 Q8
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset stock(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset orders(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset customer(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset nation(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset supplier(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset item(id uuid not unknown) open type primary key `id` autogenerated;
+create dataset region(id uuid not unknown) open type primary key `id` autogenerated;
+
+SELECT
+  GET_YEAR(DATE(rn1coolis.o_entry_d)) AS l_year,
+  ROUND((SUM(CASE WHEN sun2.n_name = 'Germany' THEN rn1coolis.ol_amount ELSE 0 END) / SUM(rn1coolis.ol_amount)),2)
+  AS mkt_share
+FROM (
+  SELECT rn1cooli.o_entry_d, rn1cooli.ol_amount, s.s_w_id, s.s_i_id
+  FROM stock s
+  JOIN (
+    SELECT o.o_entry_d, ol.ol_i_id, ol.ol_amount, ol.ol_supply_w_id
+    FROM orders o, o.o_orderline ol, item i
+    JOIN (
+      SELECT c.c_id,c.c_w_id, c.c_d_id
+      FROM customer c
+      JOIN (
+        SELECT n1.n_nationkey
+        FROM nation n1, region r
+        WHERE n1.n_regionkey = r.r_regionkey AND r.r_name = 'Europe'
+      ) nr ON nr.n_nationkey = string_to_codepoint(c.c_state)[0]
+    ) cnr ON cnr.c_id = o.o_c_id
+          AND cnr.c_w_id = o.o_w_id AND cnr.c_d_id = o.o_d_id AND i.i_data LIKE '%b'
+          AND i.i_id = ol.ol_i_id AND ol.ol_i_id < 1000
+          AND o.o_entry_d BETWEEN '2017-01-01 00:00:00.000000' AND '2018-12-31 00:00:00.000000'
+  ) rn1cooli ON rn1cooli.ol_i_id = s.s_i_id AND rn1cooli.ol_supply_w_id = s.s_w_id
+) rn1coolis
+JOIN (
+  SELECT su.su_suppkey, n2.n_name
+  FROM supplier su, nation n2
+  WHERE su.su_nationkey = n2.n_nationkey
+) sun2 ON rn1coolis.s_w_id * rn1coolis.s_i_id MOD 10000 = sun2.su_suppkey
+GROUP BY get_year(date(rn1coolis.o_entry_d))
+ORDER BY l_year;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/inner_right_corr.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/inner_right_corr.sqlpp
new file mode 100644
index 0000000..86821b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/inner_right_corr.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test plan when right branch of an inner join uses an outer variable.
+ * Currently this results in NL join
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset t1(id uuid not unknown) open type primary key id autogenerated;
+create dataset t2(id uuid not unknown) open type primary key id autogenerated;
+
+select a
+from t1
+let a = (select value count(*) from t2 join t1.x as z on t2.y = z.b );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ch2/ch2_q8_subquery.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ch2/ch2_q8_subquery.plan
new file mode 100644
index 0000000..3c62aa7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ch2/ch2_q8_subquery.plan
@@ -0,0 +1,128 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$#1(ASC) ]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$333]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_EXCHANGE [$$333]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$278]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- HYBRID_HASH_JOIN [$$304][$$325]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$304]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$280, $$279][$$290, $$320]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$280, $$279]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$290, $$320]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$297, $$299, $$301][$$317, $$318, $$316]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$297, $$299, $$301]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$290][$$308]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$290]  |PARTITIONED|
+                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- UNNEST  |PARTITIONED|
+                                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$308]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.item)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$317, $$318, $$316]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$295][$$315]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$295]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$315]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$292][$$293]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$292]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- REPLICATE  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$293]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN (test.region)  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$325]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$309][$$310]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$309]  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$310]  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/inner_right_corr.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/inner_right_corr.plan
new file mode 100644
index 0000000..37c3434
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/inner_right_corr.plan
@@ -0,0 +1,53 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- PRE_CLUSTERED_GROUP_BY[$$71]  |PARTITIONED|
+                    {
+                      -- AGGREGATE  |LOCAL|
+                        -- AGGREGATE  |LOCAL|
+                          -- STREAM_SELECT  |LOCAL|
+                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                    }
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$71][$$87]  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- DATASOURCE_SCAN (test.t1)  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$87]  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- UNNEST  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- SUBPLAN  |PARTITIONED|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- STREAM_SELECT  |LOCAL|
+                                              -- ASSIGN  |LOCAL|
+                                                -- UNNEST  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- NESTED_LOOP  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- DATASOURCE_SCAN (test.t1)  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- DATASOURCE_SCAN (test.t2)  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-3006.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-3006.plan
index a5264a9..dfaa310 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-3006.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-3006.plan
@@ -1,49 +1,41 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$64(ASC) ]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$55(ASC) ]  |PARTITIONED|
         -- STREAM_PROJECT  |PARTITIONED|
           -- STREAM_SELECT  |PARTITIONED|
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- SORT_GROUP_BY[$$78]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$65]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  -- PRE_CLUSTERED_GROUP_BY[$$48]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$46]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
                               -- STREAM_SELECT  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                           }
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- STREAM_PROJECT  |PARTITIONED|
+                      -- STABLE_SORT [$$46(ASC)]  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- HYBRID_HASH_JOIN [$$48][$$61]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- DATASOURCE_SCAN (test.ds1)  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$61]  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- HYBRID_HASH_JOIN [$$b][$$51]  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$b]  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- UNNEST  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- DATASOURCE_SCAN (test.ds1)  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$51]  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ASSIGN  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- DATASOURCE_SCAN (test.ds2)  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                              -- HYBRID_HASH_JOIN [$$b][$$48]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$b]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- UNNEST  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN (test.ds1)  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$48]  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN (test.ds2)  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index 77b9991..9115b1c 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.lang.common.base.AbstractClause;
 import org.apache.asterix.lang.common.base.Clause.ClauseType;
 import org.apache.asterix.lang.common.base.Expression;
@@ -50,6 +51,7 @@
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
 import org.apache.asterix.lang.sqlpp.clause.AbstractBinaryCorrelateClause;
+import org.apache.asterix.lang.sqlpp.clause.AbstractBinaryCorrelateWithConditionClause;
 import org.apache.asterix.lang.sqlpp.clause.FromClause;
 import org.apache.asterix.lang.sqlpp.clause.FromTerm;
 import org.apache.asterix.lang.sqlpp.clause.HavingClause;
@@ -107,41 +109,74 @@
         }
 
         // Visits join/unnest/nest clauses.
-        for (AbstractBinaryCorrelateClause correlateClause : fromTerm.getCorrelateClauses()) {
-            Collection<VariableExpr> correlateFreeVars = new HashSet<>();
-            correlateClause.accept(this, correlateFreeVars);
-            if (correlateClause.getClauseType() != ClauseType.JOIN_CLAUSE) {
-                // Correlation is allowed if the clause is not a join clause,
-                // therefore we remove left-side binding variables for these cases.
-                correlateFreeVars.removeAll(bindingVariables);
-
-                // Adds binding variables.
-                bindingVariables.add(correlateClause.getRightVariable());
-                if (correlateClause.hasPositionalVariable()) {
-                    bindingVariables.add(correlateClause.getPositionalVariable());
-                }
+        Collection<VariableExpr> clauseFreeVars = null;
+        Collection<VariableExpr> conditionFreeVars = null;
+        for (AbstractBinaryCorrelateClause clause : fromTerm.getCorrelateClauses()) {
+            if (clauseFreeVars == null) {
+                clauseFreeVars = new HashSet<>();
+            } else {
+                clauseFreeVars.clear();
             }
-            freeVars.addAll(correlateFreeVars);
+            clause.getRightExpression().accept(this, clauseFreeVars);
+
+            switch (clause.getClauseType()) {
+                case UNNEST_CLAUSE:
+                    // right branch CAN be use binding variables from prior clauses
+                    // -> these binding variables are not free vars for the whole FromTerm
+                    clauseFreeVars.removeAll(bindingVariables);
+                    break;
+                case JOIN_CLAUSE:
+                case NEST_CLAUSE:
+                    // right branch CANNOT use binding variables from prior clauses, but condition expression CAN.
+                    if (conditionFreeVars == null) {
+                        conditionFreeVars = new HashSet<>();
+                    } else {
+                        conditionFreeVars.clear();
+                    }
+                    AbstractBinaryCorrelateWithConditionClause clauseWithCondition =
+                            (AbstractBinaryCorrelateWithConditionClause) clause;
+                    clauseWithCondition.getConditionExpression().accept(this, conditionFreeVars);
+                    conditionFreeVars.removeAll(bindingVariables);
+                    conditionFreeVars.remove(clause.getRightVariable());
+                    if (clause.hasPositionalVariable()) {
+                        conditionFreeVars.remove(clause.getPositionalVariable());
+                    }
+                    clauseFreeVars.addAll(conditionFreeVars);
+                    break;
+                default:
+                    throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, clause.getSourceLocation(),
+                            clause.getClauseType().toString());
+            }
+
+            // Adds binding variables.
+            bindingVariables.add(clause.getRightVariable());
+            if (clause.hasPositionalVariable()) {
+                bindingVariables.add(clause.getPositionalVariable());
+            }
+            freeVars.addAll(clauseFreeVars);
         }
         return null;
     }
 
     @Override
-    public Void visit(JoinClause joinClause, Collection<VariableExpr> freeVars) throws CompilationException {
-        visitJoinAndNest(joinClause, joinClause.getConditionExpression(), freeVars);
-        return null;
+    public Void visit(JoinClause joinClause, Collection<VariableExpr> arg) throws CompilationException {
+        // not supposed to be invoked
+        throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, joinClause.getSourceLocation(),
+                joinClause.getClauseType().toString());
     }
 
     @Override
-    public Void visit(NestClause nestClause, Collection<VariableExpr> freeVars) throws CompilationException {
-        visitJoinAndNest(nestClause, nestClause.getConditionExpression(), freeVars);
-        return null;
+    public Void visit(NestClause nestClause, Collection<VariableExpr> arg) throws CompilationException {
+        // not supposed to be invoked
+        throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, nestClause.getSourceLocation(),
+                nestClause.getClauseType().toString());
     }
 
     @Override
-    public Void visit(UnnestClause unnestClause, Collection<VariableExpr> freeVars) throws CompilationException {
-        unnestClause.getRightExpression().accept(this, freeVars);
-        return null;
+    public Void visit(UnnestClause unnestClause, Collection<VariableExpr> arg) throws CompilationException {
+        // not supposed to be invoked
+        throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, unnestClause.getSourceLocation(),
+                unnestClause.getClauseType().toString());
     }
 
     @Override
@@ -491,20 +526,6 @@
         }
     }
 
-    private void visitJoinAndNest(AbstractBinaryCorrelateClause clause, Expression condition,
-            Collection<VariableExpr> freeVars) throws CompilationException {
-        clause.getRightExpression().accept(this, freeVars);
-        Collection<VariableExpr> conditionFreeVars = new HashSet<>();
-        condition.accept(this, freeVars);
-
-        // The condition expression can free binding variables defined in the join clause.
-        conditionFreeVars.remove(clause.getRightVariable());
-        if (clause.hasPositionalVariable()) {
-            conditionFreeVars.remove(clause.getPositionalVariable());
-        }
-        freeVars.addAll(conditionFreeVars);
-    }
-
     private void visit(List<Expression> exprs, Collection<VariableExpr> arg) throws CompilationException {
         for (Expression expr : exprs) {
             expr.accept(this, arg);