ASTERIXDB-1127: fix ExtractCommonOperatorsRule.
Change-Id: I16933a4b72432b5fbd523ca80ce6426f6b6743a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/691
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index d130d4c..ac2ae5c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -233,7 +233,7 @@
AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
if (pc != null) {
opConstraint = pc;
- } else if (opInputs == null || opInputs.size() == 0) {
+ } else if ((opInputs == null || opInputs.size() == 0) && finalPass) {
opConstraint = new AlgebricksCountPartitionConstraint(1);
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 7b04bd5..2a28d2e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -35,7 +35,7 @@
public class HeuristicOptimizer {
- public static PhysicalOperatorTag[] hyraxOperators = new PhysicalOperatorTag[] {
+ public static PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
@@ -44,8 +44,8 @@
PhysicalOperatorTag.UNION_ALL };
public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
- public static boolean isHyraxOp(PhysicalOperatorTag opTag) {
- for (PhysicalOperatorTag t : hyraxOperators) {
+ public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
+ for (PhysicalOperatorTag t : hyracksOperators) {
if (t == opTag) {
return true;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index f13187f..3b31f6d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
@@ -67,8 +68,9 @@
&& op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
- if (!roots.contains(op))
+ if (!roots.contains(op)) {
roots.add(new MutableObject<ILogicalOperator>(op));
+ }
return false;
}
@@ -89,10 +91,12 @@
topDownMaterialization(roots);
genCandidates(context);
removeTrivialShare();
- if (equivalenceClasses.size() > 0)
+ if (equivalenceClasses.size() > 0) {
changed = rewrite(context);
- if (!rewritten)
+ }
+ if (!rewritten) {
rewritten = changed;
+ }
equivalenceClasses.clear();
childrenToParents.clear();
opToCandidateInputs.clear();
@@ -110,22 +114,27 @@
for (int i = candidates.size() - 1; i >= 0; i--) {
Mutable<ILogicalOperator> opRef = candidates.get(i);
AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
- if (aop.getOperatorTag() == LogicalOperatorTag.EXCHANGE)
+ if (aop.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
aop = (AbstractLogicalOperator) aop.getInputs().get(0).getValue();
- if (aop.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE)
+ }
+ if (aop.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
candidates.remove(i);
+ }
}
}
- for (int i = equivalenceClasses.size() - 1; i >= 0; i--)
- if (equivalenceClasses.get(i).size() < 2)
+ for (int i = equivalenceClasses.size() - 1; i >= 0; i--) {
+ if (equivalenceClasses.get(i).size() < 2) {
equivalenceClasses.remove(i);
+ }
+ }
}
private boolean rewrite(IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
- if (rewriteForOneEquivalentClass(members, context))
+ if (rewriteForOneEquivalentClass(members, context)) {
changed = true;
+ }
}
return changed;
}
@@ -191,11 +200,13 @@
List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
- for (LogicalVariable liveVar : liveVarsNew)
+ for (LogicalVariable liveVar : liveVarsNew) {
assignExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
+ }
for (Mutable<ILogicalOperator> ref : group) {
- if (ref.equals(candidate))
+ if (ref.equals(candidate)) {
continue;
+ }
ArrayList<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
Map<LogicalVariable, LogicalVariable> variableMappingBack = new HashMap<LogicalVariable, LogicalVariable>();
IsomorphismUtilities.mapVariablesTopDown(ref.getValue(), candidate.getValue(), variableMappingBack);
@@ -227,20 +238,13 @@
for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue();
int index = parentOp.getInputs().indexOf(ref);
- if (parentOp.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
- AbstractLogicalOperator parentOpNext = (AbstractLogicalOperator) childrenToParents
- .get(parentOpRef).get(0).getValue();
- if (parentOpNext.isMap()) {
- index = parentOpNext.getInputs().indexOf(parentOpRef);
- parentOp = parentOpNext;
- }
- }
-
ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
: projectOperator;
- if (parentOp.isMap()) {
+ if (!HeuristicOptimizer.isHyracksOp(parentOp.getPhysicalOperator().getOperatorTag())) {
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
} else {
+ // If the parent operator is a hyracks operator,
+ // an extra one-to-one exchange is needed.
AbstractLogicalOperator exchg = new ExchangeOperator();
exchg.setPhysicalOperator(new OneToOneExchangePOperator());
exchg.setExecutionMode(childOp.getExecutionMode());
@@ -270,16 +274,19 @@
if (candidates.size() > 0) {
for (Mutable<ILogicalOperator> opRef : candidates) {
List<Mutable<ILogicalOperator>> refs = childrenToParents.get(opRef);
- if (refs != null)
+ if (refs != null) {
currentLevelOpRefs.addAll(refs);
+ }
}
}
- if (currentLevelOpRefs.size() == 0)
+ if (currentLevelOpRefs.size() == 0) {
continue;
+ }
candidatesGrow(currentLevelOpRefs, candidates);
}
- if (currentLevelOpRefs.size() == 0)
+ if (currentLevelOpRefs.size() == 0) {
break;
+ }
prune(context);
}
if (equivalenceClasses.size() < 1 && previousEquivalenceClasses.size() > 0) {
@@ -301,8 +308,9 @@
}
opRefList.add(op);
}
- if (op.getValue().getInputs().size() == 0)
+ if (op.getValue().getInputs().size() == 0) {
candidates.add(op);
+ }
}
if (equivalenceClasses.size() > 0) {
equivalenceClasses.get(0).addAll(candidates);
@@ -344,10 +352,12 @@
}
}
}
- if (!validCandidate)
+ if (!validCandidate) {
continue;
- if (!candidates.contains(op))
+ }
+ if (!candidates.contains(op)) {
candidates.add(op);
+ }
}
}
@@ -361,8 +371,9 @@
equivalenceClasses.clear();
for (List<Mutable<ILogicalOperator>> candidates : previousEquivalenceClasses) {
boolean[] reserved = new boolean[candidates.size()];
- for (int i = 0; i < reserved.length; i++)
+ for (int i = 0; i < reserved.length; i++) {
reserved[i] = false;
+ }
for (int i = candidates.size() - 1; i >= 0; i--) {
if (reserved[i] == false) {
List<Mutable<ILogicalOperator>> equivalentClass = new ArrayList<Mutable<ILogicalOperator>>();