[ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Some operators like INSERT,UPSERT, etc. will require their input
to hash partition the data based on a partitions map. This patch
is to make the hash exchanges satisfy this requirement.
Hash exchanges will take an optional partitions map to use when
hash partitioning.
- Make sure the partitions map is considered when comparing
partitioning properties.
Change-Id: I71457603048e9be9467943918e21ce5ede658c19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index d56963e..8865bb2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -216,7 +216,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
// For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
if (isPrimaryIndex && isEqCondition) {
@@ -239,7 +239,11 @@
orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
}
propsLocal.add(new LocalOrderProperty(orderColumns));
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, domain),
+ MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+ Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
+ int[][] partitionsMap = mp.getPartitionsMap(dataset);
+ pv[0] = new StructuralPropertiesVector(
+ UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap),
propsLocal);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
index 9917589..90be495 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -112,7 +112,7 @@
dataset.getDatasetDetails(), context.getComputationNodeDomain());
IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
AbstractScanOperator as = (AbstractScanOperator) op;
- deliveredProperties = dspp.computeDeliveredProperties(as.getVariables());
+ deliveredProperties = dspp.computeDeliveredProperties(as.getVariables(), context);
}
@Override
@@ -156,7 +156,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
index d688ab8..f522d93 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -65,11 +66,12 @@
}
@Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
IDataSource<?> ds = idx.getDataSource();
IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
AbstractScanOperator as = (AbstractScanOperator) op;
- deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables());
+ deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables(), context);
}
protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
@@ -85,7 +87,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(domain), null);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
index c0d9f4b..513b5aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
@@ -121,9 +121,9 @@
keysLeftBranchTileId.add(keysLeftBranch.get(0));
List<LogicalVariable> keysRightBranchTileId = new ArrayList<>();
keysRightBranchTileId.add(keysRightBranch.get(0));
- IPartitioningProperty pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranchTileId),
+ IPartitioningProperty pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranchTileId),
context.getComputationNodeDomain());
- IPartitioningProperty pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranchTileId),
+ IPartitioningProperty pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranchTileId),
context.getComputationNodeDomain());
List<ILocalStructuralProperty> localProperties1 = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index fc8c3e9..bb0bddb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -340,7 +340,7 @@
inputCompareVars.add(new ArrayList<>(intersect.getInputCompareVariables(i)));
}
IntersectOperator intersectWithFilter = new IntersectOperator(intersect.getOutputCompareVariables(),
- outputFilterVars, inputCompareVars, filterVars);
+ outputFilterVars, inputCompareVars, filterVars, intersect.getPartitionsMap());
intersectWithFilter.setSourceLocation(intersect.getSourceLocation());
intersectWithFilter.getInputs().addAll(intersect.getInputs());
return intersectWithFilter;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index f021845..43e482b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -284,7 +284,8 @@
subRoots.add(subRoot);
}
// Connect each secondary index utilization plan to a common intersect operator.
- ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context);
+ Index idx = chosenIndexes.get(0).getSecond();
+ ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context, idx);
subTree.getDataSourceRef().setValue(primaryUnnestOp);
return primaryUnnestOp != null;
@@ -312,7 +313,7 @@
* Connect each secondary index utilization plan to a common INTERSECT operator.
*/
private ILogicalOperator connectAll2ndarySearchPlanWithIntersect(List<ILogicalOperator> subRoots,
- IOptimizationContext context) throws AlgebricksException {
+ IOptimizationContext context, Index idx) throws AlgebricksException {
ILogicalOperator lop = subRoots.get(0);
List<List<LogicalVariable>> inputVars = new ArrayList<>(subRoots.size());
for (int i = 0; i < subRoots.size(); i++) {
@@ -360,7 +361,8 @@
VariableUtilities.substituteVariables(lop, inputVar, outputVar, context);
}
- IntersectOperator intersect = new IntersectOperator(outputVars, inputVars);
+ int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
+ IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap);
intersect.setSourceLocation(lop.getSourceLocation());
for (ILogicalOperator secondarySearch : subRoots) {
intersect.getInputs().add(secondarySearch.getInputs().get(0));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index fda3845..de493ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
@@ -114,12 +115,14 @@
public IDataSourcePropertiesProvider getPropertiesProvider() {
return new IDataSourcePropertiesProvider() {
@Override
- public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
}
@Override
- public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
+ public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
//TODO(ali): consider primary keys?
List<OrderColumn> secKeys = new ArrayList<>(numSecKeys);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b0de85e..bc9e6d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -193,7 +193,7 @@
ExchangeOperator exchangeOperator1 = new ExchangeOperator();
exchangeOperator1.setExecutionMode(PARTITIONED);
- exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
@@ -207,7 +207,7 @@
ExchangeOperator exchangeOperator2 = new ExchangeOperator();
exchangeOperator2.setExecutionMode(PARTITIONED);
- exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator));
LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
index cc18c31..729a560 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
@@ -51,7 +51,7 @@
// Constructs a parallel group-by query plan.
GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
ExchangeOperator exchange = new ExchangeOperator();
- exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
globalGby.getInputs().add(new MutableObject<>(exchange));
exchange.getInputs().add(new MutableObject<>(localGby));
@@ -94,7 +94,7 @@
// Left child plan of the join.
ExchangeOperator leftChildExchange = new ExchangeOperator();
leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
- leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
join.getInputs().add(new MutableObject<>(leftChildExchange));
leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
@@ -106,7 +106,7 @@
// Right child plan of the join.
ExchangeOperator rightChildExchange = new ExchangeOperator();
rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
- rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
join.getInputs().add(new MutableObject<>(rightChildExchange));
rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index 81873af..f51e474 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -22,7 +22,10 @@
import java.util.List;
import java.util.Set;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
@@ -47,7 +50,8 @@
}
@Override
- public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) throws AlgebricksException {
IPhysicalPropertiesVector propsVector;
IPartitioningProperty pp;
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
@@ -58,12 +62,23 @@
ds.computeLocalStructuralProperties(propsLocal, scanVariables);
break;
case DataSource.Type.FEED:
- pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+ String dsName = ((FeedDataSource) ds).getTargetDataset();
+ Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider())
+ .findDataset(ds.getId().getDataverseName(), dsName);
+ int[][] partitionsMap1 = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(feedDs);
+ pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, partitionsMap1);
break;
case DataSource.Type.INTERNAL_DATASET:
case DataSource.Type.SAMPLE:
Set<LogicalVariable> pvars = new ListSet<>();
- pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars);
+ Dataset dataset;
+ if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
+ dataset = ((DatasetDataSource) ds).getDataset();
+ } else {
+ dataset = ((SampleDataSource) ds).getDataset();
+ }
+ int[][] partitionsMap = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(dataset);
+ pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, partitionsMap);
propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
break;
default:
@@ -74,14 +89,22 @@
}
@Override
- public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
- if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
- IPartitioningProperty pp = new RandomPartitioningProperty(domain);
- List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
- ds.computeLocalStructuralProperties(propsLocal, scanVariables);
- return new StructuralPropertiesVector(pp, propsLocal);
+ public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) throws AlgebricksException {
+ switch (ds.getDatasourceType()) {
+ case DataSource.Type.INTERNAL_DATASET: {
+ IPartitioningProperty pp = new RandomPartitioningProperty(domain);
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+ ds.computeLocalStructuralProperties(propsLocal, scanVariables);
+ return new StructuralPropertiesVector(pp, propsLocal);
+ }
+ case DataSource.Type.FEED: {
+ IPartitioningProperty pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+ return new StructuralPropertiesVector(pp, new ArrayList<>());
+ }
+ default:
+ return computeRequiredProperties(scanVariables, ctx);
}
- return computeRequiredProperties(scanVariables);
}
private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
@@ -93,13 +116,26 @@
}
private static IPartitioningProperty getInternalDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
- List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) {
+ List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars, int[][] partitionsMap) {
IPartitioningProperty pp;
if (scanVariables.size() < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
- pp = new UnorderedPartitionedProperty(pvars, domain);
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
+ }
+ return pp;
+ }
+
+ public static IPartitioningProperty getFeedDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
+ List<LogicalVariable> scanVariables, int[][] partitionsMap) {
+ IPartitioningProperty pp;
+ if (scanVariables.size() < 2) {
+ pp = new RandomPartitioningProperty(domain);
+ } else {
+ Set<LogicalVariable> pvars = new ListSet<>();
+ pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
}
return pp;
}
@@ -112,7 +148,7 @@
} else {
Set<LogicalVariable> pvars = new ListSet<>();
pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
- pp = new UnorderedPartitionedProperty(pvars, domain);
+ pp = UnorderedPartitionedProperty.of(pvars, domain);
}
return pp;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 9f7d567..2c57162 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -86,12 +87,14 @@
// Unordered Random partitioning on all nodes
return new IDataSourcePropertiesProvider() {
@Override
- public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
}
@Override
- public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
+ public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
return new StructuralPropertiesVector(new RandomPartitioningProperty(domain), Collections.emptyList());
}
};
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 00fe7ae..7383ca3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1887,6 +1887,18 @@
validateDatabaseObjectNameImpl(objectName, sourceLoc);
}
+ public int[][] getPartitionsMap(Dataset dataset) throws AlgebricksException {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset);
+ return getPartitionsMap(getNumPartitions(spPc.second));
+ }
+
+ public int[][] getPartitionsMap(Index idx) throws AlgebricksException {
+ Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+ getSplitProviderAndConstraints(ds, idx.getIndexName());
+ return getPartitionsMap(getNumPartitions(spPc.second));
+ }
+
public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
return ((AlgebricksCountPartitionConstraint) constraint).getCount();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 448f5ce..c1858af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -72,6 +72,10 @@
return false;
}
+ public Dataset getDataset() {
+ return dataset;
+ }
+
private static DataSourceId createSampleDataSourceId(Dataset dataset, String sampleIndexName) {
return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), new String[] { sampleIndexName });
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
index 5086e32..3b3bc5d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
@@ -20,21 +20,25 @@
import java.util.List;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
public interface IDataSourcePropertiesProvider {
/**
- *
* @param scanVariables
+ * @param ctx
* @return
*/
- IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables);
+ IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+ throws AlgebricksException;
/**
- *
* @param scanVariables
+ * @param ctx
* @return
*/
- IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables);
+ IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+ throws AlgebricksException;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
index f77d9db..82bf431 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -44,15 +44,16 @@
private final List<LogicalVariable> outputExtraVars;
private final List<List<LogicalVariable>> inputExtraVars;
+ private final int[][] partitionsMap;
- public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars)
- throws AlgebricksException {
- this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList());
+ public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars,
+ int[][] partitionsMap) throws AlgebricksException {
+ this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList(), partitionsMap);
}
public IntersectOperator(List<LogicalVariable> outputCompareVars, List<LogicalVariable> outputExtraVars,
- List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars)
- throws AlgebricksException {
+ List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars,
+ int[][] partitionsMap) throws AlgebricksException {
int numCompareVars = outputCompareVars.size();
for (List<LogicalVariable> vars : inputCompareVars) {
if (vars.size() != numCompareVars) {
@@ -75,6 +76,7 @@
}
}
+ this.partitionsMap = partitionsMap;
this.outputCompareVars = new ArrayList<>(outputCompareVars);
this.inputCompareVars = new ArrayList<>(inputCompareVars);
this.outputExtraVars = new ArrayList<>();
@@ -174,6 +176,10 @@
return outputExtraVars;
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
+
private List<LogicalVariable> concatOutputVariables() {
return ListUtils.union(outputCompareVars, outputExtraVars);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 5a6574a..dfd6398 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,6 +78,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -384,9 +386,9 @@
if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT) {
return Boolean.FALSE;
}
- IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
+ IntersectOperator intersectOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
- List<LogicalVariable> outputCompareVarsArg = intersetOpArg.getOutputCompareVariables();
+ List<LogicalVariable> outputCompareVarsArg = intersectOpArg.getOutputCompareVariables();
if (outputCompareVars.size() != outputCompareVarsArg.size()) {
return Boolean.FALSE;
}
@@ -395,7 +397,7 @@
}
boolean hasExtraVars = op.hasExtraVariables();
List<LogicalVariable> outputExtraVars = op.getOutputExtraVariables();
- List<LogicalVariable> outputExtraVarsArg = intersetOpArg.getOutputExtraVariables();
+ List<LogicalVariable> outputExtraVarsArg = intersectOpArg.getOutputExtraVariables();
if (outputExtraVars.size() != outputExtraVarsArg.size()) {
return Boolean.FALSE;
}
@@ -404,19 +406,22 @@
}
int nInput = op.getNumInput();
- if (nInput != intersetOpArg.getNumInput()) {
+ if (nInput != intersectOpArg.getNumInput()) {
return Boolean.FALSE;
}
for (int i = 0; i < nInput; i++) {
if (!VariableUtilities.varListEqualUnordered(op.getInputCompareVariables(i),
- intersetOpArg.getInputCompareVariables(i))) {
+ intersectOpArg.getInputCompareVariables(i))) {
return Boolean.FALSE;
}
if (hasExtraVars && !VariableUtilities.varListEqualUnordered(op.getInputExtraVariables(i),
- intersetOpArg.getInputExtraVariables(i))) {
+ intersectOpArg.getInputExtraVariables(i))) {
return Boolean.FALSE;
}
}
+ if (!Arrays.deepEquals(op.getPartitionsMap(), intersectOpArg.getPartitionsMap())) {
+ return Boolean.FALSE;
+ }
return Boolean.TRUE;
}
@@ -543,9 +548,15 @@
if (!partProp.getNodeDomain().sameAs(partPropArg.getNodeDomain())) {
return Boolean.FALSE;
}
- List<LogicalVariable> columns = new ArrayList<LogicalVariable>();
+ if (partProp.getPartitioningType() == IPartitioningProperty.PartitioningType.UNORDERED_PARTITIONED) {
+ if (!((UnorderedPartitionedProperty) partProp)
+ .samePartitioningScheme(((UnorderedPartitionedProperty) partPropArg))) {
+ return Boolean.FALSE;
+ }
+ }
+ List<LogicalVariable> columns = new ArrayList<>();
partProp.getColumns(columns);
- List<LogicalVariable> columnsArg = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> columnsArg = new ArrayList<>();
partPropArg.getColumns(columnsArg);
if (columns.size() != columnsArg.size()) {
return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 71a659b..b49fbcc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -523,8 +524,12 @@
inputExtraVarsCopy.add(deepCopyVariableList(op.getInputExtraVariables(i)));
}
}
+ int[][] partitionsMap = op.getPartitionsMap();
+ int[][] partitionsMapCopy =
+ partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+
IntersectOperator opCopy = new IntersectOperator(outputCompareVarsCopy, outputExtraVarsCopy,
- inputCompareVarsCopy, inputExtraVarsCopy);
+ inputCompareVarsCopy, inputExtraVarsCopy, partitionsMapCopy);
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index b7029d1..5112812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -230,7 +231,11 @@
newInputExtraVars.add(new ArrayList<>(op.getInputExtraVariables(i)));
}
}
- return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars);
+ int[][] partitionsMap = op.getPartitionsMap();
+ int[][] partitionsMapCopy =
+ partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+ return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars,
+ partitionsMapCopy);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 38ccee5..a594e7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -94,21 +94,19 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
// In a cost-based optimizer, we would also try to propagate the
// parent's partitioning requirements.
IPartitioningProperty pp1;
IPartitioningProperty pp2;
switch (partitioningType) {
case PAIRWISE:
- pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranch),
- context.getComputationNodeDomain());
- pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranch),
- context.getComputationNodeDomain());
+ pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranch), ctx.getComputationNodeDomain());
+ pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranch), ctx.getComputationNodeDomain());
break;
case BROADCAST:
- pp1 = new RandomPartitioningProperty(context.getComputationNodeDomain());
- pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
+ pp1 = new RandomPartitioningProperty(ctx.getComputationNodeDomain());
+ pp2 = new BroadcastPartitioningProperty(ctx.getComputationNodeDomain());
break;
default:
throw new IllegalStateException();
@@ -141,9 +139,9 @@
(UnorderedPartitionedProperty) firstDeliveredPartitioning;
Set<LogicalVariable> set1 = upp1.getColumnSet();
UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
- Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> modifuppreq = new ListSet<>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
- Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> covered = new ListSet<>();
Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
? keysRightBranch : keysLeftBranch;
@@ -182,8 +180,14 @@
+ " to agree with partitioning property " + firstDeliveredPartitioning
+ " delivered by previous input operator.");
}
- UnorderedPartitionedProperty upp2 =
- new UnorderedPartitionedProperty(modifuppreq, requirements.getNodeDomain());
+ UnorderedPartitionedProperty upp2;
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) requirements;
+ if (rqd.usesPartitionsMap()) {
+ upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+ rqd.getNodeDomain(), rqd.getPartitionsMap());
+ } else {
+ upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+ }
return new Pair<>(false, upp2);
}
case ORDERED_PARTITIONED: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index a81bf97..2f02a61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -65,7 +65,7 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index d2cc5d9..969fd99 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -230,7 +230,7 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 5159ac5..05b441c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -27,7 +28,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
return emptyUnaryRequirements();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 560435e..fcc8c8e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -79,7 +79,7 @@
IPartitioningProperty pp;
switch (op.getExecutionMode()) {
case PARTITIONED:
- pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(partitionColumns),
context.getComputationNodeDomain());
break;
case UNPARTITIONED:
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 2543330..682d1cf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,12 +72,12 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
IPhysicalPropertiesVector physicalProps =
- dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
physicalProps.getLocalProperties());
return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index 80843a4..ea19a78 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -74,11 +74,12 @@
}
@Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
// partitioning properties
DataSourceScanOperator dssOp = (DataSourceScanOperator) op;
IDataSourcePropertiesProvider dspp = dataSource.getPropertiesProvider();
- deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables());
+ deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables(), context);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 7515258..89e17ad 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -108,8 +108,9 @@
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
- new ListSet<LogicalVariable>(columnList), context.getComputationNodeDomain()), null);
+ pv[0] = new StructuralPropertiesVector(
+ UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain()),
+ null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 91dba24..55b40b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -47,12 +47,14 @@
public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
- private List<LogicalVariable> hashFields;
- private INodeDomain domain;
+ private final List<LogicalVariable> hashFields;
+ private final INodeDomain domain;
+ private final int[][] partitionsMap;
- public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain) {
+ public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain, int[][] partitionsMap) {
this.hashFields = hashFields;
this.domain = domain;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -70,7 +72,12 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(hashFields), domain);
+ IPartitioningProperty p;
+ if (partitionsMap != null) {
+ p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(hashFields), domain, partitionsMap);
+ } else {
+ p = UnorderedPartitionedProperty.of(new ListSet<>(hashFields), domain);
+ }
this.deliveredProperties = new StructuralPropertiesVector(p, null);
}
@@ -98,9 +105,13 @@
hashFunctionFactories[i] = hashFunProvider.getBinaryHashFunctionFactory(env.getVarType(v));
++i;
}
- ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+ ITuplePartitionComputerFactory tpcf =
+ new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
return new Pair<>(conn, null);
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index c5ce871..5861464 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -63,12 +63,14 @@
private final List<OrderColumn> orderColumns;
private final List<LogicalVariable> partitionFields;
private final INodeDomain domain;
+ private final int[][] partitionsMap;
public HashPartitionMergeExchangePOperator(List<OrderColumn> orderColumns, List<LogicalVariable> partitionFields,
- INodeDomain domain) {
+ INodeDomain domain, int[][] partitionsMap) {
this.orderColumns = orderColumns;
this.partitionFields = partitionFields;
this.domain = domain;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -82,11 +84,15 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p =
- new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(partitionFields), domain);
+ IPartitioningProperty p;
+ if (partitionsMap != null) {
+ p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(partitionFields), domain, partitionsMap);
+ } else {
+ p = UnorderedPartitionedProperty.of(new ListSet<>(partitionFields), domain);
+ }
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
- List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+ List<ILocalStructuralProperty> locals = new ArrayList<>();
for (ILocalStructuralProperty prop : op2Locals) {
if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
locals.add(prop);
@@ -133,7 +139,8 @@
++i;
}
}
- ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+ ITuplePartitionComputerFactory tpcf =
+ new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
int n = orderColumns.size();
int[] sortFields = new int[n];
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index aee268b..50bdf8a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -84,14 +84,14 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
//skVarMap is used to remove duplicated variable references for order operator
Map<Integer, Object> skVarMap = new HashMap<Integer, Object>();
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
- IPhysicalPropertiesVector physicalProps =
- dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computeRequiredProperties(scanVariables, context);
List<ILocalStructuralProperty> localProperties = new ArrayList<>();
List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
// Data needs to be sorted based on the [token, number of token, PK]
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 0386275..c4f912a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -101,15 +101,15 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
for (int i = 0; i < numOfAdditionalNonFilteringFields; i++) {
scanVariables.add(new LogicalVariable(-1));
}
- IPhysicalPropertiesVector r =
- dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computeRequiredProperties(scanVariables, context);
r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 7527fa6..ab4ee61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,14 +80,15 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(payload);
if (additionalNonFilteringFields != null) {
scanVariables.addAll(additionalNonFilteringFields);
}
- IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r =
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 9a595c5..2b838d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -79,7 +80,11 @@
IPartitioningProperty pp = null;
if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputCompareVariables(i));
- pp = new UnorderedPartitionedProperty(partitioningVariables, null);
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ int[][] partitionsMap = intersectOp.getPartitionsMap();
+ pp = partitionsMap != null
+ ? UnorderedPartitionedProperty.ofPartitionsMap(partitioningVariables, nodeDomain, partitionsMap)
+ : UnorderedPartitionedProperty.of(partitioningVariables, nodeDomain);
}
pv[i] = new StructuralPropertiesVector(pp, localProps);
}
@@ -173,4 +178,12 @@
public boolean expensiveThanMaterialization() {
return false;
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 1e97182..55ba9f9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,11 +80,12 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
- IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r =
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
index a49c4b3..d6f4812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
@@ -43,7 +43,7 @@
@Override
public Pair<Boolean, IPartitioningProperty> coordinateRequirements(IPartitioningProperty requirements,
IPartitioningProperty firstDeliveredPartitioning, ILogicalOperator op, IOptimizationContext context) {
- return new Pair<Boolean, IPartitioningProperty>(true, requirements);
+ return new Pair<>(true, requirements);
}
};
@@ -62,9 +62,9 @@
(UnorderedPartitionedProperty) firstDeliveredPartitioning;
Set<LogicalVariable> set1 = upp1.getColumnSet();
UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) rqdpp;
- Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> modifuppreq = new ListSet<>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
- Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> covered = new ListSet<>();
// coordinate from an existing partition property
// (firstDeliveredPartitioning)
@@ -94,16 +94,22 @@
"The number of variables are not equal in both partitioning sides");
}
- UnorderedPartitionedProperty upp2 =
- new UnorderedPartitionedProperty(modifuppreq, rqdpp.getNodeDomain());
- return new Pair<Boolean, IPartitioningProperty>(false, upp2);
+ UnorderedPartitionedProperty upp2;
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdpp;
+ if (rqd.usesPartitionsMap()) {
+ upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+ rqd.getNodeDomain(), rqd.getPartitionsMap());
+ } else {
+ upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+ }
+ return new Pair<>(false, upp2);
}
case ORDERED_PARTITIONED: {
throw new NotImplementedException();
}
}
}
- return new Pair<Boolean, IPartitioningProperty>(true, rqdpp);
+ return new Pair<>(true, rqdpp);
}
};
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 1025e44..fbc97d9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -136,6 +136,9 @@
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
+ if (!ur.samePartitioningScheme(ud)) {
+ return false;
+ }
if (mayExpandProperties) {
return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
} else {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index fa8650c..6d4c389 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -30,10 +31,22 @@
public final class UnorderedPartitionedProperty extends AbstractGroupingProperty implements IPartitioningProperty {
private INodeDomain domain;
+ private final int[][] partitionsMap;
- public UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+ private UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain,
+ int[][] partitionsMap) {
super(partitioningVariables);
this.domain = domain;
+ this.partitionsMap = partitionsMap;
+ }
+
+ public static UnorderedPartitionedProperty of(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+ return new UnorderedPartitionedProperty(partitioningVariables, domain, null);
+ }
+
+ public static UnorderedPartitionedProperty ofPartitionsMap(Set<LogicalVariable> partitioningVariables,
+ INodeDomain domain, int[][] partitionsMap) {
+ return new UnorderedPartitionedProperty(partitioningVariables, domain, partitionsMap);
}
@Override
@@ -46,7 +59,7 @@
List<FunctionalDependency> fds) {
Set<LogicalVariable> normalizedColumnSet =
normalizeAndReduceGroupingColumns(columnSet, equivalenceClasses, fds);
- return new UnorderedPartitionedProperty(normalizedColumnSet, domain);
+ return new UnorderedPartitionedProperty(normalizedColumnSet, domain, partitionsMap);
}
@Override
@@ -79,12 +92,23 @@
applied = true;
}
}
- return applied ? new UnorderedPartitionedProperty(newColumnSet, domain) : this;
+ return applied ? new UnorderedPartitionedProperty(newColumnSet, domain, partitionsMap) : this;
}
@Override
public IPartitioningProperty clonePartitioningProperty() {
- return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain);
+ return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain, partitionsMap);
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
+
+ public boolean usesPartitionsMap() {
+ return partitionsMap != null;
+ }
+
+ public boolean samePartitioningScheme(UnorderedPartitionedProperty another) {
+ return Arrays.deepEquals(partitionsMap, another.partitionsMap);
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 3ae1218..4d32d97 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -654,31 +654,24 @@
private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning,
int childIndex, ILogicalOperator parentOp) {
- IPhysicalOperator hashConnector;
- List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet());
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdPartitioning;
+ List<LogicalVariable> vars = new ArrayList<>(rqd.getColumnSet());
String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE);
if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
- hashConnector = new HashPartitionExchangePOperator(vars, domain);
- return hashConnector;
+ return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
}
List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties();
- boolean propWasSet = false;
- hashConnector = null;
if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue();
Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c);
List<FunctionalDependency> fds = ctx.getFDList(c);
if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
- hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain);
- propWasSet = true;
+ return new HashPartitionMergeExchangePOperator(orderColumns, vars, domain, rqd.getPartitionsMap());
}
}
- if (!propWasSet) {
- hashConnector = new HashPartitionExchangePOperator(vars, domain);
- }
- return hashConnector;
+ return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
}
/**
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
index 1b02ab4..15ba92d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
@@ -94,23 +94,24 @@
// If yes, we use HashMergeExchange; otherwise, we use HashExchange.
SortMergeExchangePOperator sme = (SortMergeExchangePOperator) currentOp.getPhysicalOperator();
HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
- Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> liveVars = new HashSet<>();
VariableUtilities.getLiveVariables(op1, liveVars);
boolean usingHashMergeExchange = true;
for (OrderColumn oc : sme.getSortColumns()) {
if (!liveVars.contains(oc.getColumn())) {
usingHashMergeExchange = false;
+ break;
}
}
if (usingHashMergeExchange) {
// Add sort columns from the SortMergeExchange into a new HashMergeExchange.
- List<OrderColumn> ocList = new ArrayList<OrderColumn>();
+ List<OrderColumn> ocList = new ArrayList<>();
for (OrderColumn oc : sme.getSortColumns()) {
ocList.add(oc);
}
- HashPartitionMergeExchangePOperator hpme =
- new HashPartitionMergeExchangePOperator(ocList, hpe.getHashFields(), hpe.getDomain());
+ HashPartitionMergeExchangePOperator hpme = new HashPartitionMergeExchangePOperator(ocList,
+ hpe.getHashFields(), hpe.getDomain(), hpe.getPartitionsMap());
op1.setPhysicalOperator(hpme);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
index 059d52a..17bf600 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
@@ -75,6 +75,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
index 31a959f..186ef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -23,10 +23,13 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
- public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
- super(hashFields, hashFunctions);
+ public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions,
+ Int2IntMap storagePartition2Compute) {
+ super(hashFields, hashFunctions, storagePartition2Compute);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 52df3b7..c91a0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -24,14 +24,27 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+
+ private static final long serialVersionUID = 2L;
private final int[] hashFields;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final int[][] partitionsMap;
public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories) {
this.hashFields = hashFields;
this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = null;
+ }
+
+ public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ int[][] partitionsMap) {
+ this.hashFields = hashFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -40,6 +53,16 @@
for (int i = 0; i < hashFunctionFactories.length; ++i) {
hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
}
- return new FieldHashPartitionComputer(hashFields, hashFunctions);
+ if (partitionsMap == null) {
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
+ } else {
+ Int2IntMap storagePartition2Compute = new Int2IntOpenHashMap();
+ for (int i = 0; i < partitionsMap.length; i++) {
+ for (int storagePartition : partitionsMap[i]) {
+ storagePartition2Compute.put(storagePartition, i);
+ }
+ }
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
index 5620a95..e36315c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -28,7 +28,7 @@
private final int numPartitions;
public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
- super(hashFields, hashFunctions);
+ super(hashFields, hashFunctions, null);
this.numPartitions = numPartitions;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
index b09bcb8..cb97d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -22,14 +22,18 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
class HashPartitioner {
private final int[] hashFields;
private final IBinaryHashFunction[] hashFunctions;
+ private final Int2IntMap storagePartition2Compute;
- public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+ public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, Int2IntMap storagePartition2Compute) {
this.hashFields = hashFields;
this.hashFunctions = hashFunctions;
+ this.storagePartition2Compute = storagePartition2Compute;
}
protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
@@ -50,6 +54,15 @@
if (h < 0) {
h = -(h + 1);
}
- return h % nParts;
+ if (storagePartition2Compute == null) {
+ return h % nParts;
+ } else {
+ int storagePartition = h % storagePartition2Compute.size();
+ int computePartition = storagePartition2Compute.getOrDefault(storagePartition, Integer.MIN_VALUE);
+ if (computePartition < 0 || computePartition >= nParts) {
+ throw new IllegalStateException("couldn't resolve storage partition to compute partition");
+ }
+ return computePartition;
+ }
}
}