Support broadcast join.
- The branch for broadcast is always the right branch, i.e., the build branch.
Change-Id: I269d29816206f4f7c21097c99b6e3f19b29be138
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1340
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpch/q12_shipping_broadcast.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpch/q12_shipping_broadcast.sqlpp
new file mode 100644
index 0000000..94b5d2b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpch/q12_shipping_broadcast.sqlpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+DROP dataverse tpch IF EXISTS;
+CREATE dataverse tpch;
+
+USE tpch;
+
+
+CREATE TYPE LineItemType AS {
+ l_linenumber : integer
+}
+
+CREATE TYPE OrderType AS {
+ o_orderkey : integer
+}
+
+CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_linenumber;
+
+CREATE DATASET Orders(OrderType) PRIMARY KEY o_orderkey;
+
+/** The plan tests that the expression for different switch-case branches are not extracted.*/
+SELECT l.l_shipmode,
+ sum(CASE
+ WHEN o.o_orderpriority = '1-URGENT' or o.o_orderpriority = '2-HIGH' THEN 1 + o.o_orderpriority * 0
+ ELSE 0 + o.o_orderpriority * 0
+ END) high_line_count,
+ sum(CASE o.o_orderpriority = '1-URGENT' or o.o_orderpriority = '2-HIGH'
+ WHEN true THEN 0 + o.o_orderpriority * 0
+ ELSE 1 + o.o_orderpriority * 0
+ END) low_line_count
+FROM LineItem l,
+ Orders o
+WHERE l.l_orderkey /*+ bcast */ = o.o_orderkey AND l.l_commitdate < l.l_receiptdate AND
+ l.l_shipdate < l.l_commitdate AND l.l_receiptdate >= '1994-01-01' AND
+ l.l_receiptdate < '1995-01-01' AND (l.l_shipmode = 'MAIL' OR l.l_shipmode = 'SHIP')
+GROUP BY l.l_shipmode
+ORDER BY l.l_shipmode
+;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
new file mode 100644
index 0000000..773b4db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
@@ -0,0 +1,43 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$12(ASC) ] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$104] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$104(ASC)] HASH:[$$104] |PARTITIONED|
+ -- SORT_GROUP_BY[$$86] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$93][$$86] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$93] |PARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$86] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$94][$$90] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.1.ddl.sqlpp
new file mode 100644
index 0000000..0db8d36
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.1.ddl.sqlpp
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : integer,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create type tpch.OrderType as
+ closed {
+ o_orderkey : integer,
+ o_custkey : integer,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : integer,
+ o_comment : string
+}
+
+create type tpch.CustomerType as
+ closed {
+ c_custkey : integer,
+ c_name : string,
+ c_address : string,
+ c_nationkey : integer,
+ c_phone : string,
+ c_acctbal : double,
+ c_mktsegment : string,
+ c_comment : string
+}
+
+create type tpch.SupplierType as
+ closed {
+ s_suppkey : integer,
+ s_name : string,
+ s_address : string,
+ s_nationkey : integer,
+ s_phone : string,
+ s_acctbal : double,
+ s_comment : string
+}
+
+create type tpch.NationType as
+ closed {
+ n_nationkey : integer,
+ n_name : string,
+ n_regionkey : integer,
+ n_comment : string
+}
+
+create type tpch.RegionType as
+ closed {
+ r_regionkey : integer,
+ r_name : string,
+ r_comment : string
+}
+
+create type tpch.PartType as
+ closed {
+ p_partkey : integer,
+ p_name : string,
+ p_mfgr : string,
+ p_brand : string,
+ p_type : string,
+ p_size : integer,
+ p_container : string,
+ p_retailprice : double,
+ p_comment : string
+}
+
+create type tpch.PartSuppType as
+ closed {
+ ps_partkey : integer,
+ ps_suppkey : integer,
+ ps_availqty : integer,
+ ps_supplycost : double,
+ ps_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create dataset Orders(OrderType) primary key o_orderkey;
+
+create dataset Supplier(SupplierType) primary key s_suppkey;
+
+create dataset Region(RegionType) primary key r_regionkey;
+
+create dataset Nation(NationType) primary key n_nationkey;
+
+create dataset Part(PartType) primary key p_partkey;
+
+create dataset Partsupp(PartSuppType) primary key ps_partkey,ps_suppkey;
+
+create dataset Customer(CustomerType) primary key c_custkey;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.2.update.sqlpp
new file mode 100644
index 0000000..e72e451
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.2.update.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Supplier using localfs ((`path`=`asterix_nc1://data/tpch0.001/supplier.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Region using localfs ((`path`=`asterix_nc1://data/tpch0.001/region.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Nation using localfs ((`path`=`asterix_nc1://data/tpch0.001/nation.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Part using localfs ((`path`=`asterix_nc1://data/tpch0.001/part.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Partsupp using localfs ((`path`=`asterix_nc1://data/tpch0.001/partsupp.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
+load dataset Customer using localfs ((`path`=`asterix_nc1://data/tpch0.001/customer.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.3.query.sqlpp
new file mode 100644
index 0000000..75b359c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/q12_shipping_broadcast/q12_shipping_broadcast.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+
+SELECT l_shipmode,
+ sum(CASE WHEN o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END) high_line_count,
+ sum(CASE o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' WHEN true THEN 0 ELSE 1 END) low_line_count
+FROM LineItem,
+ Orders
+WHERE l_orderkey /*+ bcast */ = o_orderkey AND l_commitdate < l_receiptdate AND
+ l_shipdate < l_commitdate AND l_receiptdate >= '1994-01-01' AND
+ l_receiptdate < '1995-01-01' AND (l_shipmode = 'MAIL' OR l_shipmode = 'SHIP')
+GROUP BY l_shipmode
+ORDER BY l_shipmode
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 1030454..5a2083e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6617,6 +6617,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="tpch-sql-sugar">
+ <compilation-unit name="q12_shipping_broadcast">
+ <output-dir compare="Text">q12_shipping</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="tpch-sql-sugar">
<compilation-unit name="q13_customer_distribution">
<output-dir compare="Text">q13_customer_distribution</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 50682c9..f9fa444 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -1791,14 +1791,6 @@
}
{
operand = BetweenExpr()
- {
- if (operand instanceof VariableExpr) {
- String hint = getHint(token);
- if (hint != null && hint.equals(BROADCAST_JOIN_HINT)) {
- broadcast = true;
- }
- }
- }
(
LOOKAHEAD(2)( <LT> | <GT> | <LE> | <GE> | <EQ> | <NE> | <LG> |<SIMILAR> | (<NOT> { not = true; })? (<LIKE>|<IN>))
@@ -1809,8 +1801,11 @@
annotation = IndexedNLJoinExpressionAnnotation.INSTANCE;
} else if (mhint.equals(SKIP_SECONDARY_INDEX_SEARCH_HINT)) {
annotation = SkipSecondaryIndexSearchExpressionAnnotation.INSTANCE;
+ } else if (mhint.equals(BROADCAST_JOIN_HINT)) {
+ broadcast = true;
}
}
+
String operator = token.image.toLowerCase();
if (operator.equals("<>")){
operator = "!=";
@@ -1820,9 +1815,8 @@
}
if (op == null) {
op = new OperatorExpr();
- op.addOperand(operand, broadcast);
+ op.addOperand(operand, false); // broadcast is always for the right branch
op.setCurrentop(true);
- broadcast = false;
}
try{
op.addOperator(operator);
@@ -1832,16 +1826,9 @@
}
operand = BetweenExpr()
- {
- broadcast = false;
- if (operand instanceof VariableExpr) {
- String hint = getHint(token);
- if (hint != null && hint.equals(BROADCAST_JOIN_HINT)) {
- broadcast = true;
- }
- }
+ {
op.addOperand(operand, broadcast);
- }
+ }
)?
{