ASTERIXDB-1325, ASTERIXDB-1326: fixed leftouterjoin on an external dataset

 - Fixed the issue that optimizer ignores index_nl hint on an index on an external dataset.
 - Fixed the issue that index-nested-loop-leftouterjoin on an external dataset returns an empty results.
 - Modified OptimizerTest to enable mockup HDFSCluster.
 - Added an optimizer test on an external dataset.

Change-Id: I91b2e57f0bd1beaa58dd28a84d8e08d308246b26
Reviewed-on: https://asterix-gerrit.ics.uci.edu/672
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index be3dad8..4865710 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -117,10 +117,6 @@
             dataSourceType = DataSourceType.DATASOURCE_SCAN;
             dataSourceRef = subTreeOpRef;
             return true;
-        } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EXTERNAL_LOOKUP) {
-            dataSourceType = DataSourceType.EXTERNAL_SCAN;
-            dataSourceRef = subTreeOpRef;
-            return true;
         } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
             dataSourceType = DataSourceType.COLLECTION_SCAN;
             dataSourceRef = subTreeOpRef;
@@ -152,6 +148,18 @@
                                 }
                                 dataSourceFound = true;
                             }
+                        } else if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
+                            // External lookup case
+                            if (dataSourceRef == null) {
+                                dataSourceRef = subTreeOpRef;
+                                dataSourceType = DataSourceType.EXTERNAL_SCAN;
+                            } else {
+                                // One datasource already exists. This is an additional datasource.
+                                initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
+                                ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.EXTERNAL_SCAN);
+                                ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
+                            }
+                            dataSourceFound = true;
                         }
                     }
                 } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
@@ -159,11 +167,6 @@
                     ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.DATASOURCE_SCAN);
                     ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
                     dataSourceFound = true;
-                } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EXTERNAL_LOOKUP) {
-                    initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
-                    ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.EXTERNAL_SCAN);
-                    ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
-                    dataSourceFound = true;
                 } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
                     initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
                     ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.COLLECTION_SCAN);
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index c62f5d9..60a543c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.runtime.HDFSCluster;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -77,6 +78,8 @@
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
 
+        HDFSCluster.getInstance().setup();
+
         AsterixHyracksIntegrationUtil.init(true);
         // Set the node resolver to be the identity resolver that expects node names
         // to be node controller ids; a valid assumption in test environment.
@@ -92,6 +95,9 @@
         if (files == null || files.length == 0) {
             outdir.delete();
         }
+
+        HDFSCluster.getInstance().cleanup();
+
         AsterixHyracksIntegrationUtil.deinit(true);
     }
 
diff --git a/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql b/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql
new file mode 100644
index 0000000..4c62bb7
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description  : Test that left-outer-join may use available indexes, one for a secondary index in prob subtree and another for secondary btree index in index subtree.
+ * Issue        : 1325, 1326
+ * Expected Res : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends-count: int64,
+    statuses-count: int64,
+    name: string,
+    followers-count: int64
+}
+
+create type TweetMessageType as closed {
+    tweetid: int64,
+        user: TwitterUserType,
+        sender-location: point,
+    send-time: datetime,
+        referred-topics: {{ string }},
+    message-text: string,
+    countA: int64,
+    countB: int64
+}
+
+create external dataset TweetMessages(TweetMessageType) using hdfs(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/tw_for_indexleftouterjoin.adm"),("input-format"="text-input-format"),("format"="adm"));
+
+create index IdIx on TweetMessages(tweetid) type btree;
+create index msgCountAIx on TweetMessages(countA) type btree;
+create index msgCountBIx on TweetMessages(countB) type btree;
+
+write output to asterix_nc1:"rttest/external-indexing_leftouterjoin-probe-sidx-with-join-btree-sidx_01.adm";
+
+for $t1 in dataset('TweetMessages')
+where $t1.tweetid < int64("10")
+order by $t1.tweetid
+return {
+"tweetid1": $t1.tweetid,
+"count1":$t1.countA,
+"t2info": for $t2 in dataset('TweetMessages')
+          where $t1.countA /* +indexnl */= $t2.countB
+          order by $t2.tweetid
+          return {"tweetid2": $t2.tweetid,
+                  "count2":$t2.countB}
+};
diff --git a/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
new file mode 100644
index 0000000..e40816f
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
@@ -0,0 +1,41 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$27(ASC), $$28(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$19, $$21]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- STREAM_SELECT  |LOCAL|
+                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STABLE_SORT [$$19(ASC), $$21(ASC), $$22(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$19, $$21]  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- EXTERNAL_LOOKUP  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$34(ASC), $$35(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- EXTERNAL_LOOKUP  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STABLE_SORT [$$31(ASC), $$32(ASC)]  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 76c8b85..f149ed3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -49,7 +49,6 @@
     private Map<String, String> configuration;
     private boolean retainInput;
     private boolean retainNull;
-    private int[] propagatedFields;
     private INullWriterFactory iNullWriterFactory;
 
     public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
@@ -70,9 +69,8 @@
                     snapshotAccessor);
             reader.configure(configuration);
             RecordIdReader ridReader = RecordIdReaderFactory.create(configuration, ridFields);
-            configurePropagatedFields(inRecDesc);
-            return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, propagatedFields,
-                    retainNull, iNullWriterFactory, ctx, writer);
+            return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, retainNull,
+                    iNullWriterFactory, ctx, writer);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
@@ -87,25 +85,4 @@
         dataParserFactory.configure(configuration);
     }
 
-    private void configurePropagatedFields(RecordDescriptor inRecDesc) {
-        int ptr = 0;
-        boolean skip = false;
-        propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
-        for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
-            if (ptr < ridFields.length) {
-                skip = false;
-                for (int j = 0; j < ridFields.length; j++) {
-                    if (ridFields[j] == i) {
-                        ptr++;
-                        skip = true;
-                        break;
-                    }
-                }
-                if (!skip)
-                    propagatedFields[i - ptr] = i;
-            } else {
-                propagatedFields[i - ptr] = i;
-            }
-        }
-    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index a97182c..69cd063 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -43,7 +43,6 @@
 public final class LookupAdapter<T> implements IFrameWriter {
 
     private boolean propagateInput;
-    private int[] propagatedFields;
     private boolean retainNull;
     private ArrayTupleBuilder tb;
     private FrameTupleAppender appender;
@@ -56,13 +55,12 @@
     private ArrayTupleBuilder nullTupleBuild;
 
     public LookupAdapter(IRecordDataParser<T> dataParser, ILookupRecordReader<? extends T> recordReader,
-            RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, int[] propagatedFields,
-            boolean retainNull, INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
+            RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, boolean retainNull,
+            INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
                     throws HyracksDataException {
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         this.propagateInput = propagateInput;
-        this.propagatedFields = propagatedFields;
         this.retainNull = retainNull;
         this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
         this.ridReader = ridReader;
@@ -74,7 +72,9 @@
 
     private void configurePropagation(INullWriterFactory iNullWriterFactory) {
         if (propagateInput) {
-            tb = new ArrayTupleBuilder(propagatedFields.length + 1);
+            // This LookupAdapter generates an external record as its output.
+            // Thus, we add 1.
+            tb = new ArrayTupleBuilder(tupleAccessor.getFieldCount() + 1);
             frameTuple = new FrameTupleReference();
         } else {
             tb = new ArrayTupleBuilder(1);
@@ -124,7 +124,7 @@
                 }
                 tb.reset();
                 if (propagateInput) {
-                    propagate(tupleIndex);
+                    propagateInputFields(tupleIndex);
                 }
                 if (record != null) {
                     dataParser.parse(record, tb.getDataOutput());
@@ -142,11 +142,11 @@
         }
     }
 
-    private void propagate(int idx) throws IOException {
+    private void propagateInputFields(int idx) throws IOException {
         frameTuple.reset(tupleAccessor, idx);
-        for (int i = 0; i < propagatedFields.length; i++) {
-            tb.getDataOutput().write(frameTuple.getFieldData(propagatedFields[i]),
-                    frameTuple.getFieldStart(propagatedFields[i]), frameTuple.getFieldLength(propagatedFields[i]));
+        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+            tb.getDataOutput().write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i));
             tb.addFieldEndOffset();
         }
     }