ASTERIXDB-1325, ASTERIXDB-1326: fixed leftouterjoin on an external dataset
- Fixed the issue that optimizer ignores index_nl hint on an index on an external dataset.
- Fixed the issue that index-nested-loop-leftouterjoin on an external dataset returns an empty results.
- Modified OptimizerTest to enable mockup HDFSCluster.
- Added an optimizer test on an external dataset.
Change-Id: I91b2e57f0bd1beaa58dd28a84d8e08d308246b26
Reviewed-on: https://asterix-gerrit.ics.uci.edu/672
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index be3dad8..4865710 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -117,10 +117,6 @@
dataSourceType = DataSourceType.DATASOURCE_SCAN;
dataSourceRef = subTreeOpRef;
return true;
- } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EXTERNAL_LOOKUP) {
- dataSourceType = DataSourceType.EXTERNAL_SCAN;
- dataSourceRef = subTreeOpRef;
- return true;
} else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
dataSourceType = DataSourceType.COLLECTION_SCAN;
dataSourceRef = subTreeOpRef;
@@ -152,6 +148,18 @@
}
dataSourceFound = true;
}
+ } else if (f.getFunctionIdentifier().equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
+ // External lookup case
+ if (dataSourceRef == null) {
+ dataSourceRef = subTreeOpRef;
+ dataSourceType = DataSourceType.EXTERNAL_SCAN;
+ } else {
+ // One datasource already exists. This is an additional datasource.
+ initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
+ ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.EXTERNAL_SCAN);
+ ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
+ }
+ dataSourceFound = true;
}
}
} else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
@@ -159,11 +167,6 @@
ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.DATASOURCE_SCAN);
ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
dataSourceFound = true;
- } else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EXTERNAL_LOOKUP) {
- initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
- ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.EXTERNAL_SCAN);
- ixJoinOuterAdditionalDataSourceRefs.add(subTreeOpRef);
- dataSourceFound = true;
} else if (subTreeOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
initializeIxJoinOuterAddtionalDataSourcesIfEmpty();
ixJoinOuterAdditionalDataSourceTypes.add(DataSourceType.COLLECTION_SCAN);
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index c62f5d9..60a543c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -38,6 +38,7 @@
import org.apache.asterix.external.util.IdentitiyResolverFactory;
import org.apache.asterix.test.base.AsterixTestHelper;
import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.runtime.HDFSCluster;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
@@ -77,6 +78,8 @@
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
+ HDFSCluster.getInstance().setup();
+
AsterixHyracksIntegrationUtil.init(true);
// Set the node resolver to be the identity resolver that expects node names
// to be node controller ids; a valid assumption in test environment.
@@ -92,6 +95,9 @@
if (files == null || files.length == 0) {
outdir.delete();
}
+
+ HDFSCluster.getInstance().cleanup();
+
AsterixHyracksIntegrationUtil.deinit(true);
}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql b/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql
new file mode 100644
index 0000000..4c62bb7
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.aql
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Test that left-outer-join may use available indexes, one for a secondary index in prob subtree and another for secondary btree index in index subtree.
+ * Issue : 1325, 1326
+ * Expected Res : Success
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends-count: int64,
+ statuses-count: int64,
+ name: string,
+ followers-count: int64
+}
+
+create type TweetMessageType as closed {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int64,
+ countB: int64
+}
+
+create external dataset TweetMessages(TweetMessageType) using hdfs(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/tw_for_indexleftouterjoin.adm"),("input-format"="text-input-format"),("format"="adm"));
+
+create index IdIx on TweetMessages(tweetid) type btree;
+create index msgCountAIx on TweetMessages(countA) type btree;
+create index msgCountBIx on TweetMessages(countB) type btree;
+
+write output to asterix_nc1:"rttest/external-indexing_leftouterjoin-probe-sidx-with-join-btree-sidx_01.adm";
+
+for $t1 in dataset('TweetMessages')
+where $t1.tweetid < int64("10")
+order by $t1.tweetid
+return {
+"tweetid1": $t1.tweetid,
+"count1":$t1.countA,
+"t2info": for $t2 in dataset('TweetMessages')
+ where $t1.countA /* +indexnl */= $t2.countB
+ order by $t2.tweetid
+ return {"tweetid2": $t2.tweetid,
+ "count2":$t2.countB}
+};
diff --git a/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
new file mode 100644
index 0000000..e40816f
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
@@ -0,0 +1,41 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$27(ASC), $$28(ASC) ] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$19, $$21] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$19(ASC), $$21(ASC), $$22(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$19, $$21] |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EXTERNAL_LOOKUP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$34(ASC), $$35(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EXTERNAL_LOOKUP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$31(ASC), $$32(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 76c8b85..f149ed3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -49,7 +49,6 @@
private Map<String, String> configuration;
private boolean retainInput;
private boolean retainNull;
- private int[] propagatedFields;
private INullWriterFactory iNullWriterFactory;
public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
@@ -70,9 +69,8 @@
snapshotAccessor);
reader.configure(configuration);
RecordIdReader ridReader = RecordIdReaderFactory.create(configuration, ridFields);
- configurePropagatedFields(inRecDesc);
- return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, propagatedFields,
- retainNull, iNullWriterFactory, ctx, writer);
+ return new LookupAdapter<T>(dataParser, reader, inRecDesc, ridReader, retainInput, retainNull,
+ iNullWriterFactory, ctx, writer);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -87,25 +85,4 @@
dataParserFactory.configure(configuration);
}
- private void configurePropagatedFields(RecordDescriptor inRecDesc) {
- int ptr = 0;
- boolean skip = false;
- propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length];
- for (int i = 0; i < inRecDesc.getFieldCount(); i++) {
- if (ptr < ridFields.length) {
- skip = false;
- for (int j = 0; j < ridFields.length; j++) {
- if (ridFields[j] == i) {
- ptr++;
- skip = true;
- break;
- }
- }
- if (!skip)
- propagatedFields[i - ptr] = i;
- } else {
- propagatedFields[i - ptr] = i;
- }
- }
- }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index a97182c..69cd063 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -43,7 +43,6 @@
public final class LookupAdapter<T> implements IFrameWriter {
private boolean propagateInput;
- private int[] propagatedFields;
private boolean retainNull;
private ArrayTupleBuilder tb;
private FrameTupleAppender appender;
@@ -56,13 +55,12 @@
private ArrayTupleBuilder nullTupleBuild;
public LookupAdapter(IRecordDataParser<T> dataParser, ILookupRecordReader<? extends T> recordReader,
- RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, int[] propagatedFields,
- boolean retainNull, INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
+ RecordDescriptor inRecDesc, RecordIdReader ridReader, boolean propagateInput, boolean retainNull,
+ INullWriterFactory iNullWriterFactory, IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
this.dataParser = dataParser;
this.recordReader = recordReader;
this.propagateInput = propagateInput;
- this.propagatedFields = propagatedFields;
this.retainNull = retainNull;
this.tupleAccessor = new FrameTupleAccessor(inRecDesc);
this.ridReader = ridReader;
@@ -74,7 +72,9 @@
private void configurePropagation(INullWriterFactory iNullWriterFactory) {
if (propagateInput) {
- tb = new ArrayTupleBuilder(propagatedFields.length + 1);
+ // This LookupAdapter generates an external record as its output.
+ // Thus, we add 1.
+ tb = new ArrayTupleBuilder(tupleAccessor.getFieldCount() + 1);
frameTuple = new FrameTupleReference();
} else {
tb = new ArrayTupleBuilder(1);
@@ -124,7 +124,7 @@
}
tb.reset();
if (propagateInput) {
- propagate(tupleIndex);
+ propagateInputFields(tupleIndex);
}
if (record != null) {
dataParser.parse(record, tb.getDataOutput());
@@ -142,11 +142,11 @@
}
}
- private void propagate(int idx) throws IOException {
+ private void propagateInputFields(int idx) throws IOException {
frameTuple.reset(tupleAccessor, idx);
- for (int i = 0; i < propagatedFields.length; i++) {
- tb.getDataOutput().write(frameTuple.getFieldData(propagatedFields[i]),
- frameTuple.getFieldStart(propagatedFields[i]), frameTuple.getFieldLength(propagatedFields[i]));
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ tb.getDataOutput().write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+ frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
}
}