ASTERIXDB-1487: fix the wrong plan when we prune the selective branch.

1. Add the test case of ASTERIX-1487 with single join branch required.
2. Disable the join branch pruning in case of unnestmap following datasourcescan.
   - We need to prune the join branch when it is NOT required by the upstream operators and its generated join key is derived from the same DATASOURCE of the other branch.
   - We SHOULD NOT prune the join branch if there exists a selective operator (UNNESTMAP, LOUNNESTMAP, LIMIT, SELECT) located between the join operator and DATASOURCESCAN.

Change-Id: I1aef69a2278853fd9f8020da6639331b367ed5ad
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1119
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
index 2e43912..03c7663 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
@@ -50,6 +50,7 @@
  * 1. The live variables of one input branch of the join are not used in the upstream plan
  * 2. The join is an inner equi join
  * 3. The join condition only uses variables that correspond to primary keys of the same dataset
+ * 4. The records of one input branch will not be filtered by the selective operators till join.
  * Notice that the last condition implies a 1:1 join, i.e., the join does not change the result cardinality.
  * Joins that satisfy the above conditions may be introduced by other rules
  * which use surrogate optimizations. Such an optimization aims to reduce data copies and communication costs by
@@ -61,11 +62,11 @@
  */
 public class RemoveUnusedOneToOneEquiJoinRule implements IAlgebraicRewriteRule {
 
-    private final Set<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
-    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
-    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
-    private final List<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
-    private final List<DataSourceScanOperator> dataScans = new ArrayList<DataSourceScanOperator>();
+    private final Set<LogicalVariable> parentsUsedVars = new HashSet<>();
+    private final List<LogicalVariable> usedVars = new ArrayList<>();
+    private final List<LogicalVariable> liveVars = new ArrayList<>();
+    private final List<LogicalVariable> pkVars = new ArrayList<>();
+    private final List<DataSourceScanOperator> dataScans = new ArrayList<>();
     private boolean hasRun = false;
 
     @Override
@@ -179,9 +180,35 @@
             // keys from datasource scans of the same dataset.
             return -1;
         }
+        // Suppose we Project B over A.a ~= B.b, where A's fields are involved in a selective operator.
+        // We expect the post-plan will NOT prune the join part derived from A.
+        if (unusedJoinBranchIndex >= 0
+                && isSelectionAboveDataScan(opRef.getValue().getInputs().get(unusedJoinBranchIndex))) {
+            unusedJoinBranchIndex = -1;
+        }
         return unusedJoinBranchIndex;
     }
 
+    private boolean isSelectionAboveDataScan(Mutable<ILogicalOperator> opRef) {
+        boolean hasSelection = false;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        LogicalOperatorTag tag = op.getOperatorTag();
+        switch (tag) {
+            case DATASOURCESCAN:
+                return false;
+            case UNNEST_MAP:
+            case LEFT_OUTER_UNNEST_MAP:
+            case LIMIT:
+            case SELECT:
+                return true;
+            default:
+                for (Mutable<ILogicalOperator> inputOp : op.getInputs()) {
+                    hasSelection |= isSelectionAboveDataScan(inputOp);
+                }
+        }
+        return hasSelection;
+    }
+
     private void gatherProducingDataScans(Mutable<ILogicalOperator> opRef, List<LogicalVariable> joinUsedVars,
             List<DataSourceScanOperator> dataScans) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 0bc5e78..8f42904 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -65,6 +65,7 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -73,7 +74,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/select-self-join.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/select-self-join.aql
new file mode 100644
index 0000000..720bacdc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/select-self-join.aql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description    : Tests that self-join on primary key with select introduces surrogate join.
+ * Success        : Yes
+ */
+ 
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to asterix_nc1:"opttest/select-self-join.adm";
+
+create type empType as open {
+id: int,
+sal: int
+}
+
+create dataset Emps(empType) primary key id;
+
+for $e1 in dataset Emps
+for $e2 in (for $e3 in dataset Emps where $e3.sal > 1000 return $e3)
+where $e1.id = $e2.id
+return $e1
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
index 4e40dd2..5b08bf5 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
@@ -13,20 +13,33 @@
               -- STABLE_SORT [$$25(ASC)]  |PARTITIONED|
                 -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
-                    -- STREAM_SELECT  |PARTITIONED|
-                      -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$36][$$25]  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- BTREE_SEARCH  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- STREAM_SELECT  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- BTREE_SEARCH  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH  |PARTITIONED|
+                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/select-self-join.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/select-self-join.plan
new file mode 100644
index 0000000..45c0e3e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/select-self-join.plan
@@ -0,0 +1,16 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- HYBRID_HASH_JOIN [$$11][$$12]  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- DATASOURCE_SCAN  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- STREAM_SELECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- DATASOURCE_SCAN  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.ddl.aql
new file mode 100644
index 0000000..811c2b5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.ddl.aql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse fuzzyjoin if exists;
+
+create dataverse fuzzyjoin;
+
+use dataverse fuzzyjoin;
+
+create type DBLPType as open {
+  id: int64,
+  dblpid: string?,
+  title: string?,
+  authors: string?,
+  misc: string?
+}
+
+create type CSXType as open {
+  id: int64,
+  csxid: string?,
+  title: string?,
+  authors: string?,
+  misc: string?
+}
+
+create dataset DBLP(DBLPType) primary key id;
+create dataset CSX(CSXType) primary key id;
+
+create index author_index on DBLP(authors) type keyword;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.2.update.aql
new file mode 100644
index 0000000..fc2fb4b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.2.update.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse fuzzyjoin;
+
+load dataset DBLP
+using localfs
+(("path"="asterix_nc1://data/pub-small/dblp-small-id.txt"),("format"="delimited-text"),("delimiter"=":"));
+
+load dataset CSX
+using localfs
+(("path"="asterix_nc1://data/pub-small/csx-small-id.txt"),("format"="delimited-text"),("delimiter"=":"),("quote"="\u0000"));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.3.query.aql
new file mode 100644
index 0000000..7c65b3b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.3.query.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse fuzzyjoin;
+
+set simthreshold '.7f'
+
+let $s := count(
+for $t in dataset('CSX')
+for $o in dataset('DBLP')
+where contains($o.title, "System") and
+word-tokens($o.authors) ~= word-tokens($t.authors)
+return $t)
+return $s
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.adm
new file mode 100644
index 0000000..00750ed
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/fuzzyjoin/dblp-csx-aqlplus_5/dblp-csx-aqlplus_5.1.adm
@@ -0,0 +1 @@
+3
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 5750b28..2f277cc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2656,6 +2656,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="fuzzyjoin">
+      <compilation-unit name="dblp-csx-aqlplus_5">
+        <output-dir compare="Text">dblp-csx-aqlplus_5</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="fuzzyjoin">
       <compilation-unit name="dblp-csx-dblp-aqlplus_1">
         <output-dir compare="Text">dblp-csx-dblp-aqlplus_1</output-dir>
       </compilation-unit>