[NO ISSUE][COMP] Fix error in RemoveUnusedOneToOneEquiJoinRule

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix internal error in RemoveUnusedOneToOneEquiJoinRule
  when function datasource is joined with a dataset

Change-Id: Iad4d17a0a3bb4cea23625fbbce0c623c597c4991
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9226
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
index 6a70786..61361f6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
@@ -24,7 +24,9 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -227,12 +229,14 @@
 
     private void fillPKVars(DataSourceScanOperator dataScan, List<LogicalVariable> pkVars) {
         pkVars.clear();
-        DatasetDataSource datasetDataSource = (DatasetDataSource) dataScan.getDataSource();
-        pkVars.clear();
-        if (datasetDataSource.getDataset().getDatasetDetails() instanceof InternalDatasetDetails) {
-            int numPKs = datasetDataSource.getDataset().getPrimaryKeys().size();
-            for (int i = 0; i < numPKs; i++) {
-                pkVars.add(dataScan.getVariables().get(i));
+        DataSource dataSource = (DataSource) dataScan.getDataSource();
+        if (dataSource.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
+            Dataset dataset = ((DatasetDataSource) dataSource).getDataset();
+            if (dataset.getDatasetDetails() instanceof InternalDatasetDetails) {
+                int numPKs = dataset.getPrimaryKeys().size();
+                for (int i = 0; i < numPKs; i++) {
+                    pkVars.add(dataScan.getVariables().get(i));
+                }
             }
         }
     }
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/fnds_join_ds.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/fnds_join_ds.sqlpp
new file mode 100644
index 0000000..5ae9ad6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/joins/fnds_join_ds.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Functional datasource join with dataset
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create  dataset t1(TestType) primary key id;
+
+set `import-private-functions` `true`;
+
+select count(*)
+from tpcds_datagen("customer_address", 1.0) tpcds, t1
+where tpcds.ca_address_id = t1.aid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/fnds_join_ds.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/fnds_join_ds.plan
new file mode 100644
index 0000000..a561ca4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/joins/fnds_join_ds.plan
@@ -0,0 +1,24 @@
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      -- ASSIGN  |UNPARTITIONED|
+        -- AGGREGATE  |UNPARTITIONED|
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            -- AGGREGATE  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- HYBRID_HASH_JOIN [$$48][$$49]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$48]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- DATASOURCE_SCAN (asterix.tpcds-datagen.customer_address.1.0)  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$49]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- DATASOURCE_SCAN (test.t1)  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file