ASTERIXDB-221: reduce unneceesary partitioning for hash joins.
For a hash join, start top-down data property optimization from
a partitioning-compatiable child, and hence the other child's
partitioning requirement could be updated.
Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507
Reviewed-on: https://asterix-gerrit.ics.uci.edu/395
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wenhai Li <lwhaymail@yahoo.com>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 1091e1a..f5ea5f1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -138,11 +138,16 @@
Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
+ List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
+ : keysLeftBranch;
+ List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
+ : keysRightBranch;
for (LogicalVariable r : uppreq.getColumnSet()) {
EquivalenceClass ecSnd = eqmap.get(r);
boolean found = false;
int j = 0;
- for (LogicalVariable rvar : keysRightBranch) {
+ for (LogicalVariable rvar : keysFirst) {
if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) {
found = true;
break;
@@ -151,9 +156,9 @@
}
if (!found) {
throw new IllegalStateException("Did not find a variable equivalent to "
- + r + " among " + keysRightBranch);
+ + r + " among " + keysFirst);
}
- LogicalVariable v2 = keysLeftBranch.get(j);
+ LogicalVariable v2 = keysSecond.get(j);
EquivalenceClass ecFst = eqmap.get(v2);
for (LogicalVariable vset1 : set1) {
if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index af40f67..ae9f4f1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -132,11 +132,10 @@
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
- if (mayExpandProperties) {
- return isPrefixOf(ud.getColumnSet().iterator(), ur.getColumnSet().iterator());
- } else {
- return ur.getColumnSet().equals(ud.getColumnSet());
- }
+ if (mayExpandProperties)
+ return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
+ else
+ return (ud.getColumnSet().equals(ur.getColumnSet()));
}
case ORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 4df9db7..2181efa 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -27,7 +27,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -154,6 +153,43 @@
return changed;
}
+ // Gets the index of a child to start top-down data property enforcement.
+ // If there is a partitioning-compatible child with the operator in opRef, start from this child;
+ // otherwise, start from child zero.
+ private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
+ IOptimizationContext context) throws AlgebricksException {
+ IPhysicalPropertiesVector[] reqdProperties = null;
+ if (pr != null) {
+ reqdProperties = pr.getRequiredProperties();
+ }
+
+ List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>();
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+ deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties()
+ .getPartitioningProperty());
+ }
+ int partitioningCompatibleChild = 0;
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
+ if (reqdProperties == null
+ || reqdProperties[i] == null
+ || reqdProperties[i].getPartitioningProperty() == null
+ || deliveredPropertyFromChild == null
+ || reqdProperties[i].getPartitioningProperty().getPartitioningType() != deliveredPartitioningPropertiesFromChildren
+ .get(i).getPartitioningType()) {
+ continue;
+ }
+ IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
+ // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
+ if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
+ partitioningCompatibleChild = i;
+ break;
+ }
+ }
+ return partitioningCompatibleChild;
+ }
+
private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
@@ -201,23 +237,31 @@
}
}
+ // The child index of the child operator to optimize first.
+ int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
IPartitioningProperty firstDeliveredPartitioning = null;
- int i = 0;
- for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
- AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+ // Enforce data properties in a top-down manner.
+ for (int j = 0; j < op.getInputs().size(); j++) {
+ // Starts from a partitioning-compatible child if any to loop over all children.
+ int childIndex = (j + startChildIndex) % op.getInputs().size();
+ IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
+ AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator()
+ ": " + delivered + "\n");
IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
+ // Coordinates requirements by looking at the firstDeliveredPartitioning.
Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
- reqdProperties[i].getPartitioningProperty(), firstDeliveredPartitioning, op, context);
+ requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context);
boolean mayExpandPartitioningProperties = pbpp.first;
IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
- reqdProperties[i].getLocalProperties());
+ requiredProperty.getLocalProperties());
AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator()
+ ": " + rqd + "\n");
+ // The partitioning property of reqdProperties[childIndex] could be updated here because
+ // rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child));
@@ -227,9 +271,9 @@
if (diff != null) {
changed = true;
- addEnforcers(op, i, diff, rqd, delivered, childrenDomain, nestedPlan, context);
+ addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
- AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(i).getValue());
+ AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex).getValue());
if (newChild != child) {
delivered = newChild.getDeliveredPhysicalProperties();
@@ -242,8 +286,8 @@
break;
}
}
-
}
+
if (firstDeliveredPartitioning == null) {
IPartitioningProperty dpp = delivered.getPartitioningProperty();
if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
@@ -251,8 +295,6 @@
firstDeliveredPartitioning = dpp;
}
}
-
- i++;
}
if (op.hasNestedPlans()) {
@@ -279,7 +321,6 @@
// Now, transfer annotations from the original sort op. to this one.
AbstractLogicalOperator transferTo = nextOp;
if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
- //
// remove duplicate exchange operator
transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
}
@@ -598,7 +639,6 @@
}
return ordCols;
}
-
}
private void setNewOp(Mutable<ILogicalOperator> opRef, AbstractLogicalOperator newOp, IOptimizationContext context)