[ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- re-implement PushSubplanIntoGroupByRule and let it handle general cases;
- add an option to LogicalOperatorDeepCopyWithNewVariablesVisitor for
not re-mapping free variables in the given plan subtree.
Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1992
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 725de12..19c6da7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -629,13 +629,13 @@
// Create first copy.
LogicalOperatorDeepCopyWithNewVariablesVisitor firstDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
- context, context, newProbeSubTreeVarMap);
+ context, context, newProbeSubTreeVarMap, true);
ILogicalOperator newProbeSubTree = firstDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
inferTypes(newProbeSubTree, context);
Mutable<ILogicalOperator> newProbeSubTreeRootRef = new MutableObject<ILogicalOperator>(newProbeSubTree);
// Create second copy.
LogicalOperatorDeepCopyWithNewVariablesVisitor secondDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
- context, context, joinInputSubTreeVarMap);
+ context, context, joinInputSubTreeVarMap, true);
ILogicalOperator joinInputSubTree = secondDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
inferTypes(joinInputSubTree, context);
probeSubTree.getRootRef().setValue(joinInputSubTree);
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
new file mode 100644
index 0000000..4b94bf6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : This test case is to verify the fix for issue810
+ * https://code.google.com/p/asterixdb/issues/detail?id=810
+ * Expected Res : SUCCESS
+ * Date : 16th Nov. 2014
+ */
+
+DROP DATAVERSE tpch IF EXISTS;
+CREATE DATAVERSE tpch;
+
+USE tpch;
+
+
+CREATE TYPE LineItemType AS CLOSED {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_orderkey,l_linenumber;
+
+
+SELECT l_returnflag AS l_returnflag,
+ l_linestatus AS l_linestatus,
+ coll_count(cheap) AS count_cheaps,
+ coll_count(expensive) AS count_expensives
+FROM LineItem AS l
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag,l.l_linestatus AS l_linestatus
+GROUP AS g
+LET cheap = (
+ SELECT ELEMENT m
+ FROM (FROM g SELECT VALUE l) AS m
+ WHERE m.l_discount > 0.05
+),
+expensive = (
+ SELECT ELEMENT g.l
+ FROM g
+ WHERE g.l.l_discount <= 0.05
+)
+ORDER BY l_returnflag,l_linestatus
+;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index d9ce6c8..26d6103 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -8,12 +8,12 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [topK: 100] [$$o_totalprice(DESC), $$o_orderdate(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$72, $$73] |PARTITIONED|
+ -- SORT_GROUP_BY[$$73, $$74] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$72, $$73] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED|
-- SORT_GROUP_BY[$$56, $$57] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -49,12 +49,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$69] |PARTITIONED|
+ -- SORT_GROUP_BY[$$70] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$69] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$70] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$58] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
index ab10f2d..f2adbf6 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$27] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$28] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -20,9 +20,9 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$27(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$27] |PARTITIONED|
- -- SORT_GROUP_BY[$$18, $$24] |PARTITIONED|
+ -- STABLE_SORT [$$28(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$28] |PARTITIONED|
+ -- SORT_GROUP_BY[$$18, $$25] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
index cad2fbb..238736a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$30, $$31] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
new file mode 100644
index 0000000..9a7df35
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
@@ -0,0 +1,37 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$31, $$32] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$31(ASC), $$32(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
index 1c56f1c..18b4218 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$32, $$33] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -31,9 +31,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
index 13fb7e1..2d75ff0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
@@ -3,12 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$19] |PARTITIONED|
+ -- SORT_GROUP_BY[$$20] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$20] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$16] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
index fe1f67a..4405b9d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
@@ -13,12 +13,12 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$nation_key(ASC)] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$nation_key] |PARTITIONED|
- -- SORT_GROUP_BY[$$69, $$70] |PARTITIONED|
+ -- SORT_GROUP_BY[$$70, $$71] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$70, $$71] |PARTITIONED|
-- SORT_GROUP_BY[$$50, $$54] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -31,7 +31,7 @@
-- HASH_PARTITION_EXCHANGE [$$56] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$54][$$63] |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$54][$$64] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
@@ -48,7 +48,7 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$63] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$64] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
index e229e75..1d24a80 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$75, $$76] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$69, $$70] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -17,8 +17,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$75(ASC), $$76(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$75, $$76] |PARTITIONED|
+ -- STABLE_SORT [$$69(ASC), $$70(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
index ca941bf..3cfd8af 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$34, $$35] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$36, $$37] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$34(ASC), $$35(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$34, $$35] |PARTITIONED|
+ -- STABLE_SORT [$$36(ASC), $$37(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36, $$37] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$23, $$24] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
index a5124c3..2eb7603 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
@@ -27,34 +27,34 @@
-- UNNEST |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- SUBPLAN |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- ASSIGN |LOCAL|
- -- UNNEST |LOCAL|
- -- SUBPLAN |LOCAL|
- {
- -- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL|
- -- UNNEST |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97] |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- UNNEST |LOCAL|
+ -- SUBPLAN |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL|
+ -- UNNEST |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index cb89a73..cef387a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -43,12 +44,15 @@
private final IVariableContext varContext;
private final Map<LogicalVariable, LogicalVariable> inVarMapping;
private final Map<LogicalVariable, LogicalVariable> outVarMapping;
+ private final Set<LogicalVariable> freeVars;
public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext varContext,
- Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping) {
+ Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping,
+ Set<LogicalVariable> freeVars) {
this.varContext = varContext;
this.inVarMapping = inVarMapping;
this.outVarMapping = variableMapping;
+ this.freeVars = freeVars;
}
public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
@@ -146,6 +150,9 @@
public ILogicalExpression visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
throws AlgebricksException {
LogicalVariable var = expr.getVariableReference();
+ if (freeVars.contains(var)) {
+ return expr;
+ }
LogicalVariable givenVarReplacement = inVarMapping.get(var);
if (givenVarReplacement != null) {
outVarMapping.put(var, givenVarReplacement);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0da9110..934577f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,9 +19,11 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -38,10 +40,10 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -52,7 +54,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
@@ -64,10 +65,12 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
/**
@@ -87,7 +90,13 @@
// Key: New variable in the new plan. Value: The old variable in the
// original plan.
- private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping;
+ private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping = new LinkedHashMap<>();
+
+ // Free variables: variables that shouldn't be deep copied, i.e., mapped.
+ private final Set<LogicalVariable> freeVars = new HashSet<>();
+
+ // Whether free variables in the given plan subtree should be reused.
+ private final boolean reuseFreeVars;
/**
* @param varContext
@@ -96,12 +105,20 @@
* the type context.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext) {
- this.varContext = varContext;
- this.typeContext = typeContext;
- this.inputVarToOutputVarMapping = new LinkedHashMap<>();
- this.outputVarToInputVarMapping = new LinkedHashMap<>();
- this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext,
- outputVarToInputVarMapping, inputVarToOutputVarMapping);
+ this(varContext, typeContext, new LinkedHashMap<>(), false);
+ }
+
+ /**
+ * @param varContext
+ * , the variable context.
+ * @param typeContext
+ * the type context.
+ * @param reuseFreeVars
+ * whether free variables in the given plan tree should be reused.
+ */
+ public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
+ boolean reuseFreeVars) {
+ this(varContext, typeContext, new LinkedHashMap<>(), reuseFreeVars);
}
/**
@@ -113,15 +130,17 @@
* Variable mapping keyed by variables in the original plan.
* Those variables are replaced by their corresponding value in
* the map in the copied plan.
+ * @param reuseFreeVars
+ * whether free variables in the given plan tree should be reused.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
- LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping) {
+ LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping, boolean reuseFreeVars) {
this.varContext = varContext;
this.typeContext = typeContext;
this.inputVarToOutputVarMapping = inVarMapping;
- this.outputVarToInputVarMapping = new LinkedHashMap<>();
- exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
- inputVarToOutputVarMapping);
+ this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
+ inputVarToOutputVarMapping, freeVars);
+ this.reuseFreeVars = reuseFreeVars;
}
private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) {
@@ -137,6 +156,11 @@
if (op == null) {
return null;
}
+ if (reuseFreeVars) {
+ // If the reuseFreeVars flag is set, we collect all free variables in the
+ // given operator subtree and do not re-map them in the deep-copied plan.
+ OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) op, freeVars);
+ }
ILogicalOperator opCopy = op.accept(this, arg);
if (typeContext != null) {
OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, typeContext);
@@ -207,6 +231,9 @@
if (var == null) {
return null;
}
+ if (freeVars.contains(var)) {
+ return var;
+ }
LogicalVariable givenVarReplacement = outputVarToInputVarMapping.get(var);
if (givenVarReplacement != null) {
inputVarToOutputVarMapping.put(var, givenVarReplacement);
@@ -247,6 +274,7 @@
}
public void reset() {
+ freeVars.clear();
inputVarToOutputVarMapping.clear();
outputVarToInputVarMapping.clear();
}
@@ -563,10 +591,6 @@
return opCopy;
}
- public LinkedHashMap<LogicalVariable, LogicalVariable> getOutputToInputVariableMapping() {
- return outputVarToInputVarMapping;
- }
-
public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() {
return inputVarToOutputVarMapping;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8a8fe6d..8d00696 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,10 +20,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -38,6 +40,8 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.
+ LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -196,6 +200,14 @@
return new ALogicalPlanImpl(newRoots);
}
+ public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
+ ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException {
+ LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new
+ LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, true);
+ ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
+ return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping());
+ }
+
private static void setDataSource(ILogicalPlan plan, ILogicalOperator dataSource) {
for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
setDataSource(rootRef, dataSource);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 8b36a68..463f214 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -75,7 +75,7 @@
* collection provided.
*
* @param op
- * @param vars
+ * @param freeVars
* - The collection to which the free variables will be added.
*/
public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set<LogicalVariable> freeVars)
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index e73f2a5..ed4196b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -322,7 +322,7 @@
// Finds the reference of the bottom-most operator in the pipeline that
// should not be pushed to the combiner group-by.
Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby);
- Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef);
+ Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(nestedGby, currentOpRef);
// Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code>
// into the group-by keys for the introduced combiner group-by operator.
@@ -392,16 +392,29 @@
* Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code>
* that cannot be pushed into the combiner group-by operator.
*
- * @param currentOpRef
+ * @param nestedGby
+ * the nested group-by operator.
+ * @param currentOpRef,the
+ * reference of the current op.
* @return the bottom-most reference of a select operator
*/
- private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+ private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(GroupByOperator nestedGby,
+ Mutable<ILogicalOperator> currentOpRef)
throws AlgebricksException {
+ Set<LogicalVariable> usedVarsInNestedGby = new HashSet<>();
+ // Collects used variables in nested pipelines.
+ for (ILogicalPlan nestedPlan : nestedGby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) {
+ VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), usedVarsInNestedGby);
+ }
+ }
Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
while (currentOpRef.getValue().getInputs().size() > 0) {
- Set<LogicalVariable> producedVars = new HashSet<>();
- VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars);
- if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
+ // Used for checking the dependency between nestedGby and the current operator
+ Set<LogicalVariable> dependingVars = new HashSet<>();
+ VariableUtilities.getProducedVariables(currentOpRef.getValue(), dependingVars);
+ dependingVars.removeAll(usedVarsInNestedGby);
+ if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !dependingVars.isEmpty()) {
bottomOpRef = currentOpRef;
}
currentOpRef = currentOpRef.getValue().getInputs().get(0);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 3a86565..af95ecd 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -20,13 +20,18 @@
package org.apache.hyracks.algebricks.rewriter.rules.subplan;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -40,7 +45,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
@@ -51,151 +58,209 @@
*/
public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
- /** Stores used variables above the current operator. */
- private final Set<LogicalVariable> usedVarsSoFar = new HashSet<LogicalVariable>();
+ /** The pointer to the topmost operator */
+ private Mutable<ILogicalOperator> rootRef;
+ /** Whether the rule has ever been invoked */
+ private boolean invoked = false;
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
- return false;
+ if (!invoked) {
+ rootRef = opRef;
+ invoked = true;
+ }
+ return rewriteForOperator(rootRef, opRef, context);
}
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
- ILogicalOperator parentOperator = opRef.getValue();
- if (context.checkIfInDontApplySet(this, parentOperator)) {
- return false;
- }
- context.addToDontApplySet(this, parentOperator);
- VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar);
- if (parentOperator.getInputs().size() <= 0) {
- return false;
- }
+ // The core rewriting function for an operator.
+ private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, Mutable<ILogicalOperator> opRef,
+ IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
- GroupByOperator gby = null;
+ ILogicalOperator parentOperator = opRef.getValue();
for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
- AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue();
- /** Only processes subplan operator. */
- List<SubplanOperator> subplans = new ArrayList<SubplanOperator>();
- if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- SubplanOperator currentSubplan = (SubplanOperator) op;
- subplans.add(currentSubplan);
- op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ ILogicalOperator op = ref.getValue();
+ // Only processes subplan operator.
+ Deque<SubplanOperator> subplans = new ArrayDeque<>();
+ if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+ // Recursively rewrites the child plan.
+ changed |= rewriteForOperator(rootRef, ref, context);
+ continue;
+ }
+ while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+ SubplanOperator currentSubplan = (SubplanOperator) op;
+ // Recursively rewrites the pipelines inside a nested subplan.
+ for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
+ changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
+ }
}
- /** Only processes the case a group-by operator is the input of the subplan operators. */
- if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
- gby = (GroupByOperator) op;
- List<ILogicalPlan> newGbyNestedPlans = new ArrayList<ILogicalPlan>();
- for (SubplanOperator subplan : subplans) {
- List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
- List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
- List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
- for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
- List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
- List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
- for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
- /** Gets free variables in the root operator of a nested plan and its descent. */
- Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
- VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
- Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
- VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
- producedVars);
- freeVars.removeAll(producedVars);
- /** * Checks whether the above freeVars are all contained in live variables * of one nested plan inside the group-by operator. * If yes, then the subplan can be pushed into the nested plan of the group-by. */
- for (ILogicalPlan gbyNestedPlanOriginal : gbyNestedPlans) {
- // add a subplan in the original gby
- if (!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) {
- newGbyNestedPlans.add(gbyNestedPlanOriginal);
- }
-
- // add a pushed subplan
- ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(
- gbyNestedPlanOriginal, context);
- List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
- for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
- //set the nts for a original subplan
- Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlanOriginal
- .getRoots().get(rootIndex);
- Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
- NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef
- .getValue();
- originalNts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
-
- //push a new subplan if possible
- Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
- Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
- VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
- if (liveVars.containsAll(freeVars)) {
- /** Does the actual push. */
- Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
- ntsRef.setValue(gbyRootOpRef.getValue());
- // Removes unused vars.
- AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
- for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
- if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
- aggOp.getVariables().remove(varIndex);
- aggOp.getExpressions().remove(varIndex);
- }
- }
-
- gbyRootOpRef.setValue(rootOpRef.getValue());
- rootOpRefsToRemove.add(rootOpRef);
-
- // Sets the nts for a new pushed plan.
- Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(gbyRootOpRef);
- NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef
- .getValue();
- nts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
-
- newGbyNestedPlans.add(gbyNestedPlan);
- changed = true;
- continue;
- }
- }
- }
- }
- rootOpRefs.removeAll(rootOpRefsToRemove);
- if (rootOpRefs.size() == 0) {
- subplanNestedPlansToRemove.add(subplanNestedPlan);
- }
- }
- subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
- }
- if (changed) {
- ref.setValue(gby);
- gby.getNestedPlans().clear();
- gby.getNestedPlans().addAll(newGbyNestedPlans);
- }
+ subplans.addFirst(currentSubplan);
+ op = op.getInputs().get(0).getValue();
+ }
+ // Only processes the case a group-by operator is the input of the subplan operators.
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ continue;
+ }
+ GroupByOperator gby = (GroupByOperator) op;
+ // Recursively rewrites the pipelines inside a nested subplan.
+ for (ILogicalPlan subplan : gby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
+ changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
}
}
+ changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, subplans, gby, context);
}
- if (changed) {
- cleanup(gby);
- context.computeAndSetTypeEnvironmentForOperator(gby);
- context.computeAndSetTypeEnvironmentForOperator(parentOperator);
+ return changed;
+ }
+
+ // Pushes subplans into the group by operator.
+ private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> currentRootRef, ILogicalOperator parentOperator,
+ Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context)
+ throws AlgebricksException {
+ boolean changed = false;
+ List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
+ List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans();
+
+ // Adds all original subplans from the group by.
+ for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+ newGbyNestedPlans.add(gbyNestedPlanOriginal);
}
+
+ // Tries to push subplans into the group by.
+ Iterator<SubplanOperator> subplanOperatorIterator = subplans.iterator();
+ while (subplanOperatorIterator.hasNext()) {
+ SubplanOperator subplan = subplanOperatorIterator.next();
+ Iterator<ILogicalPlan> subplanNestedPlanIterator = subplan.getNestedPlans().iterator();
+ while (subplanNestedPlanIterator.hasNext()) {
+ ILogicalPlan subplanNestedPlan = subplanNestedPlanIterator.next();
+ List<Mutable<ILogicalOperator>> upperSubplanRootRefs = subplanNestedPlan.getRoots();
+ Iterator<Mutable<ILogicalOperator>> upperSubplanRootRefIterator = upperSubplanRootRefs.iterator();
+ while (upperSubplanRootRefIterator.hasNext()) {
+ Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next();
+
+ // Collects free variables in the root operator of a nested plan and its descent.
+ Set<LogicalVariable> freeVars = new ListSet<>();
+ OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(),
+ freeVars);
+
+ // Checks whether the above freeVars are all contained in live variables * of one nested plan
+ // inside the group-by operator. If yes, then the subplan can be pushed into the nested plan
+ // of the group-by.
+ for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+ ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context);
+ List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
+ for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
+ // Sets the nts for a original subplan.
+ Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex);
+ Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
+ NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef
+ .getValue();
+ originalNts.setDataSourceReference(new MutableObject<>(gby));
+
+ // Pushes a new subplan if possible.
+ Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
+ Set<LogicalVariable> liveVars = new ListSet<>();
+ VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+ if (!liveVars.containsAll(freeVars)) {
+ continue;
+ }
+
+ AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
+ for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
+ if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
+ aggOp.getVariables().remove(varIndex);
+ aggOp.getExpressions().remove(varIndex);
+ }
+ }
+
+ // Copy the original nested pipeline inside the group-by.
+ Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedAggOpAndVarMap =
+ OperatorManipulationUtil.deepCopyWithNewVars(aggOp, context);
+ ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.getLeft();
+
+ // Substitutes variables in the upper nested pipe line.
+ VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+ copiedAggOpAndVarMap.getRight(), context);
+
+ // Does the actual push.
+ Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
+ ntsRef.setValue(newBottomAgg);
+ gbyRootOpRef.setValue(rootOpRef.getValue());
+
+ // Sets the nts for a new pushed plan.
+ Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(new MutableObject<>(newBottomAgg));
+ NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef.getValue();
+ nts.setDataSourceReference(new MutableObject<>(gby));
+
+ OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(), context);
+ newGbyNestedPlans.add(new ALogicalPlanImpl(rootOpRef));
+
+ upperSubplanRootRefIterator.remove();
+ changed |= true;
+ break;
+ }
+ }
+ }
+
+ if (upperSubplanRootRefs.isEmpty()) {
+ subplanNestedPlanIterator.remove();
+ }
+ }
+ if (subplan.getNestedPlans().isEmpty()) {
+ subplanOperatorIterator.remove();
+ }
+ }
+
+ // Resets the nested subplans for the group-by operator.
+ gby.getNestedPlans().clear();
+ gby.getNestedPlans().addAll(newGbyNestedPlans);
+
+ // Connects the group-by operator with its parent operator.
+ ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() : parentOperator;
+ parent.getInputs().get(0).setValue(gby);
+
+ // Removes unnecessary pipelines inside the group by operator.
+ cleanup(currentRootRef.getValue(), gby);
+
+ // Computes type environments.
+ context.computeAndSetTypeEnvironmentForOperator(gby);
+ context.computeAndSetTypeEnvironmentForOperator(parent);
return changed;
}
/**
* Removes unused aggregation variables (and expressions)
*
- * @param gby
+ * @param rootOp,
+ * the root operator of a plan or nested plan.
+ * @param gby,
+ * the group-by operator.
* @throws AlgebricksException
*/
- private void cleanup(GroupByOperator gby) throws AlgebricksException {
- for (ILogicalPlan nestedPlan : gby.getNestedPlans()) {
- for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
- AggregateOperator aggOp = (AggregateOperator) rootRef.getValue();
+ private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException {
+ Set<LogicalVariable> freeVars = new HashSet<>();
+ OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
+ Iterator<ILogicalPlan> nestedPlanIterator = gby.getNestedPlans().iterator();
+ while (nestedPlanIterator.hasNext()) {
+ ILogicalPlan nestedPlan = nestedPlanIterator.next();
+ Iterator<Mutable<ILogicalOperator>> nestRootRefIterator = nestedPlan.getRoots().iterator();
+ while (nestRootRefIterator.hasNext()) {
+ Mutable<ILogicalOperator> nestRootRef = nestRootRefIterator.next();
+ AggregateOperator aggOp = (AggregateOperator) nestRootRef.getValue();
for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
- if (!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) {
+ if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
aggOp.getVariables().remove(varIndex);
aggOp.getExpressions().remove(varIndex);
}
}
+ if (aggOp.getVariables().isEmpty()) {
+ nestRootRefIterator.remove();
+ }
}
-
+ if (nestedPlan.getRoots().isEmpty()) {
+ nestedPlanIterator.remove();
+ }
}
}