Improved index NL joins on primary btree indexes by requiring an unordered partitioning property instead of a broadcast.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@839 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index dad8d0a..0486060 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -11,6 +11,7 @@
import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -23,8 +24,18 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -33,8 +44,16 @@
*/
public class BTreeSearchPOperator extends IndexSearchPOperator {
- public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+ private final List<LogicalVariable> lowKeyVarList;
+ private final List<LogicalVariable> highKeyVarList;
+ private boolean isPrimaryIndex;
+
+ public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+ boolean isPrimaryIndex, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
super(idx, requiresBroadcast);
+ this.isPrimaryIndex = isPrimaryIndex;
+ this.lowKeyVarList = lowKeyVarList;
+ this.highKeyVarList = highKeyVarList;
}
@Override
@@ -79,4 +98,31 @@
ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
}
+
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ if (requiresBroadcast) {
+ if (isPrimaryIndex) {
+ // For primary indexes, we require re-partitioning on the primary key, and not a broadcast.
+ // Also, add a local sorting property to enforce a sort before the primary-index operator.
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
+ searchKeyVars.addAll(lowKeyVarList);
+ searchKeyVars.addAll(highKeyVarList);
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ for (LogicalVariable orderVar : searchKeyVars) {
+ propsLocal.add(new LocalOrderProperty(new OrderColumn(orderVar, OrderKind.ASC)));
+ }
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null),
+ propsLocal);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ } else {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+ } else {
+ return super.getRequiredPropertiesForChildren(op, reqdByParent);
+ }
+ }
}
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
index 65225ca..f6c926b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -23,8 +23,8 @@
*/
public abstract class IndexSearchPOperator extends AbstractScanPOperator {
- private final IDataSourceIndex<String, AqlSourceId> idx;
- private final boolean requiresBroadcast;
+ protected final IDataSourceIndex<String, AqlSourceId> idx;
+ protected final boolean requiresBroadcast;
public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
this.idx = idx;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 0f28239..5c61715 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -203,6 +203,8 @@
physicalRewritesAllLevels.add(new PullPositionalVariableFromUnnestRule());
physicalRewritesAllLevels.add(new PushProjectDownRule());
physicalRewritesAllLevels.add(new InsertProjectBeforeUnionRule());
+ // After adding projects, we may need need to set physical operators again.
+ physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
return physicalRewritesAllLevels;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 74f790f..f57cfb4 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -14,6 +14,7 @@
import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -161,7 +162,11 @@
boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
switch (indexType) {
case BTREE: {
- op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast));
+ BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+ btreeJobGenParams.readFromFuncArgs(f.getArguments());
+ op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast,
+ btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.getLowKeyVarList(),
+ btreeJobGenParams.getHighKeyVarList()));
break;
}
case RTREE: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index af30163..93099e9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -23,7 +23,8 @@
protected String datasetName;
protected boolean retainInput;
protected boolean requiresBroadcast;
-
+ protected boolean isPrimaryIndex;
+
private final int NUM_PARAMS = 5;
public AccessMethodJobGenParams() {
@@ -36,6 +37,7 @@
this.datasetName = datasetName;
this.retainInput = retainInput;
this.requiresBroadcast = requiresBroadcast;
+ this.isPrimaryIndex = datasetName.equals(indexName);
}
public void writeToFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
@@ -52,6 +54,7 @@
datasetName = AccessMethodUtils.getStringConstant(funcArgs.get(2));
retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(3));
requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
+ isPrimaryIndex = datasetName.equals(indexName);
}
public String getIndexName() {
@@ -100,4 +103,8 @@
protected int getNumParams() {
return NUM_PARAMS;
}
+
+ public boolean isPrimaryIndex() {
+ return isPrimaryIndex;
+ }
}
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
index 80eec0d..b9b830e 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
@@ -4,9 +4,12 @@
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
-- BTREE_SEARCH |UNPARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$11(ASC)] |LOCAL|
+ -- HASH_PARTITION_EXCHANGE [$$11] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
index 80eec0d..d11dc6a 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
@@ -4,9 +4,12 @@
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
-- BTREE_SEARCH |UNPARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |LOCAL|
+ -- STABLE_SORT [$$10(ASC)] |LOCAL|
+ -- HASH_PARTITION_EXCHANGE [$$10] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
index 02f36b5..a2e27c9 100644
--- a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
@@ -4,7 +4,7 @@
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
-- BTREE_SEARCH |UNPARTITIONED|
- -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|