[ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Some operators like INSERT,UPSERT, etc. will require their input
to hash partition the data based on a partitions map. This patch
is to make the hash exchanges satisfy this requirement.
Hash exchanges will take an optional partitions map to use when
hash partitioning.
- Make sure the partitions map is considered when comparing
partitioning properties.
Change-Id: I71457603048e9be9467943918e21ce5ede658c19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
index 5086e32..3b3bc5d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
@@ -20,21 +20,25 @@
import java.util.List;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
public interface IDataSourcePropertiesProvider {
/**
- *
* @param scanVariables
+ * @param ctx
* @return
*/
- IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables);
+ IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+ throws AlgebricksException;
/**
- *
* @param scanVariables
+ * @param ctx
* @return
*/
- IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables);
+ IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+ throws AlgebricksException;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
index f77d9db..82bf431 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -44,15 +44,16 @@
private final List<LogicalVariable> outputExtraVars;
private final List<List<LogicalVariable>> inputExtraVars;
+ private final int[][] partitionsMap;
- public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars)
- throws AlgebricksException {
- this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList());
+ public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars,
+ int[][] partitionsMap) throws AlgebricksException {
+ this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList(), partitionsMap);
}
public IntersectOperator(List<LogicalVariable> outputCompareVars, List<LogicalVariable> outputExtraVars,
- List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars)
- throws AlgebricksException {
+ List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars,
+ int[][] partitionsMap) throws AlgebricksException {
int numCompareVars = outputCompareVars.size();
for (List<LogicalVariable> vars : inputCompareVars) {
if (vars.size() != numCompareVars) {
@@ -75,6 +76,7 @@
}
}
+ this.partitionsMap = partitionsMap;
this.outputCompareVars = new ArrayList<>(outputCompareVars);
this.inputCompareVars = new ArrayList<>(inputCompareVars);
this.outputExtraVars = new ArrayList<>();
@@ -174,6 +176,10 @@
return outputExtraVars;
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
+
private List<LogicalVariable> concatOutputVariables() {
return ListUtils.union(outputCompareVars, outputExtraVars);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 5a6574a..dfd6398 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,6 +78,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -384,9 +386,9 @@
if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT) {
return Boolean.FALSE;
}
- IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
+ IntersectOperator intersectOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
- List<LogicalVariable> outputCompareVarsArg = intersetOpArg.getOutputCompareVariables();
+ List<LogicalVariable> outputCompareVarsArg = intersectOpArg.getOutputCompareVariables();
if (outputCompareVars.size() != outputCompareVarsArg.size()) {
return Boolean.FALSE;
}
@@ -395,7 +397,7 @@
}
boolean hasExtraVars = op.hasExtraVariables();
List<LogicalVariable> outputExtraVars = op.getOutputExtraVariables();
- List<LogicalVariable> outputExtraVarsArg = intersetOpArg.getOutputExtraVariables();
+ List<LogicalVariable> outputExtraVarsArg = intersectOpArg.getOutputExtraVariables();
if (outputExtraVars.size() != outputExtraVarsArg.size()) {
return Boolean.FALSE;
}
@@ -404,19 +406,22 @@
}
int nInput = op.getNumInput();
- if (nInput != intersetOpArg.getNumInput()) {
+ if (nInput != intersectOpArg.getNumInput()) {
return Boolean.FALSE;
}
for (int i = 0; i < nInput; i++) {
if (!VariableUtilities.varListEqualUnordered(op.getInputCompareVariables(i),
- intersetOpArg.getInputCompareVariables(i))) {
+ intersectOpArg.getInputCompareVariables(i))) {
return Boolean.FALSE;
}
if (hasExtraVars && !VariableUtilities.varListEqualUnordered(op.getInputExtraVariables(i),
- intersetOpArg.getInputExtraVariables(i))) {
+ intersectOpArg.getInputExtraVariables(i))) {
return Boolean.FALSE;
}
}
+ if (!Arrays.deepEquals(op.getPartitionsMap(), intersectOpArg.getPartitionsMap())) {
+ return Boolean.FALSE;
+ }
return Boolean.TRUE;
}
@@ -543,9 +548,15 @@
if (!partProp.getNodeDomain().sameAs(partPropArg.getNodeDomain())) {
return Boolean.FALSE;
}
- List<LogicalVariable> columns = new ArrayList<LogicalVariable>();
+ if (partProp.getPartitioningType() == IPartitioningProperty.PartitioningType.UNORDERED_PARTITIONED) {
+ if (!((UnorderedPartitionedProperty) partProp)
+ .samePartitioningScheme(((UnorderedPartitionedProperty) partPropArg))) {
+ return Boolean.FALSE;
+ }
+ }
+ List<LogicalVariable> columns = new ArrayList<>();
partProp.getColumns(columns);
- List<LogicalVariable> columnsArg = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> columnsArg = new ArrayList<>();
partPropArg.getColumns(columnsArg);
if (columns.size() != columnsArg.size()) {
return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 71a659b..b49fbcc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -523,8 +524,12 @@
inputExtraVarsCopy.add(deepCopyVariableList(op.getInputExtraVariables(i)));
}
}
+ int[][] partitionsMap = op.getPartitionsMap();
+ int[][] partitionsMapCopy =
+ partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+
IntersectOperator opCopy = new IntersectOperator(outputCompareVarsCopy, outputExtraVarsCopy,
- inputCompareVarsCopy, inputExtraVarsCopy);
+ inputCompareVarsCopy, inputExtraVarsCopy, partitionsMapCopy);
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index b7029d1..5112812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -230,7 +231,11 @@
newInputExtraVars.add(new ArrayList<>(op.getInputExtraVariables(i)));
}
}
- return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars);
+ int[][] partitionsMap = op.getPartitionsMap();
+ int[][] partitionsMapCopy =
+ partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+ return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars,
+ partitionsMapCopy);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 38ccee5..a594e7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -94,21 +94,19 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
// In a cost-based optimizer, we would also try to propagate the
// parent's partitioning requirements.
IPartitioningProperty pp1;
IPartitioningProperty pp2;
switch (partitioningType) {
case PAIRWISE:
- pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranch),
- context.getComputationNodeDomain());
- pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranch),
- context.getComputationNodeDomain());
+ pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranch), ctx.getComputationNodeDomain());
+ pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranch), ctx.getComputationNodeDomain());
break;
case BROADCAST:
- pp1 = new RandomPartitioningProperty(context.getComputationNodeDomain());
- pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
+ pp1 = new RandomPartitioningProperty(ctx.getComputationNodeDomain());
+ pp2 = new BroadcastPartitioningProperty(ctx.getComputationNodeDomain());
break;
default:
throw new IllegalStateException();
@@ -141,9 +139,9 @@
(UnorderedPartitionedProperty) firstDeliveredPartitioning;
Set<LogicalVariable> set1 = upp1.getColumnSet();
UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
- Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> modifuppreq = new ListSet<>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
- Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> covered = new ListSet<>();
Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
? keysRightBranch : keysLeftBranch;
@@ -182,8 +180,14 @@
+ " to agree with partitioning property " + firstDeliveredPartitioning
+ " delivered by previous input operator.");
}
- UnorderedPartitionedProperty upp2 =
- new UnorderedPartitionedProperty(modifuppreq, requirements.getNodeDomain());
+ UnorderedPartitionedProperty upp2;
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) requirements;
+ if (rqd.usesPartitionsMap()) {
+ upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+ rqd.getNodeDomain(), rqd.getPartitionsMap());
+ } else {
+ upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+ }
return new Pair<>(false, upp2);
}
case ORDERED_PARTITIONED: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index a81bf97..2f02a61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -65,7 +65,7 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index d2cc5d9..969fd99 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -230,7 +230,7 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 5159ac5..05b441c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -27,7 +28,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
return emptyUnaryRequirements();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 560435e..fcc8c8e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -79,7 +79,7 @@
IPartitioningProperty pp;
switch (op.getExecutionMode()) {
case PARTITIONED:
- pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+ pp = UnorderedPartitionedProperty.of(new ListSet<>(partitionColumns),
context.getComputationNodeDomain());
break;
case UNPARTITIONED:
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 2543330..682d1cf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,12 +72,12 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
IPhysicalPropertiesVector physicalProps =
- dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
physicalProps.getLocalProperties());
return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index 80843a4..ea19a78 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -74,11 +74,12 @@
}
@Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
// partitioning properties
DataSourceScanOperator dssOp = (DataSourceScanOperator) op;
IDataSourcePropertiesProvider dspp = dataSource.getPropertiesProvider();
- deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables());
+ deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables(), context);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 7515258..89e17ad 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -108,8 +108,9 @@
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
- new ListSet<LogicalVariable>(columnList), context.getComputationNodeDomain()), null);
+ pv[0] = new StructuralPropertiesVector(
+ UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain()),
+ null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 91dba24..55b40b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -47,12 +47,14 @@
public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
- private List<LogicalVariable> hashFields;
- private INodeDomain domain;
+ private final List<LogicalVariable> hashFields;
+ private final INodeDomain domain;
+ private final int[][] partitionsMap;
- public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain) {
+ public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain, int[][] partitionsMap) {
this.hashFields = hashFields;
this.domain = domain;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -70,7 +72,12 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(hashFields), domain);
+ IPartitioningProperty p;
+ if (partitionsMap != null) {
+ p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(hashFields), domain, partitionsMap);
+ } else {
+ p = UnorderedPartitionedProperty.of(new ListSet<>(hashFields), domain);
+ }
this.deliveredProperties = new StructuralPropertiesVector(p, null);
}
@@ -98,9 +105,13 @@
hashFunctionFactories[i] = hashFunProvider.getBinaryHashFunctionFactory(env.getVarType(v));
++i;
}
- ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+ ITuplePartitionComputerFactory tpcf =
+ new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
return new Pair<>(conn, null);
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index c5ce871..5861464 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -63,12 +63,14 @@
private final List<OrderColumn> orderColumns;
private final List<LogicalVariable> partitionFields;
private final INodeDomain domain;
+ private final int[][] partitionsMap;
public HashPartitionMergeExchangePOperator(List<OrderColumn> orderColumns, List<LogicalVariable> partitionFields,
- INodeDomain domain) {
+ INodeDomain domain, int[][] partitionsMap) {
this.orderColumns = orderColumns;
this.partitionFields = partitionFields;
this.domain = domain;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -82,11 +84,15 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p =
- new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(partitionFields), domain);
+ IPartitioningProperty p;
+ if (partitionsMap != null) {
+ p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(partitionFields), domain, partitionsMap);
+ } else {
+ p = UnorderedPartitionedProperty.of(new ListSet<>(partitionFields), domain);
+ }
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
- List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+ List<ILocalStructuralProperty> locals = new ArrayList<>();
for (ILocalStructuralProperty prop : op2Locals) {
if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
locals.add(prop);
@@ -133,7 +139,8 @@
++i;
}
}
- ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+ ITuplePartitionComputerFactory tpcf =
+ new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
int n = orderColumns.size();
int[] sortFields = new int[n];
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index aee268b..50bdf8a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -84,14 +84,14 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
//skVarMap is used to remove duplicated variable references for order operator
Map<Integer, Object> skVarMap = new HashMap<Integer, Object>();
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
- IPhysicalPropertiesVector physicalProps =
- dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computeRequiredProperties(scanVariables, context);
List<ILocalStructuralProperty> localProperties = new ArrayList<>();
List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
// Data needs to be sorted based on the [token, number of token, PK]
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 0386275..c4f912a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -101,15 +101,15 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
for (int i = 0; i < numOfAdditionalNonFilteringFields; i++) {
scanVariables.add(new LogicalVariable(-1));
}
- IPhysicalPropertiesVector r =
- dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computeRequiredProperties(scanVariables, context);
r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 7527fa6..ab4ee61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,14 +80,15 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(payload);
if (additionalNonFilteringFields != null) {
scanVariables.addAll(additionalNonFilteringFields);
}
- IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r =
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 9a595c5..2b838d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -79,7 +80,11 @@
IPartitioningProperty pp = null;
if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputCompareVariables(i));
- pp = new UnorderedPartitionedProperty(partitioningVariables, null);
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ int[][] partitionsMap = intersectOp.getPartitionsMap();
+ pp = partitionsMap != null
+ ? UnorderedPartitionedProperty.ofPartitionsMap(partitioningVariables, nodeDomain, partitionsMap)
+ : UnorderedPartitionedProperty.of(partitioningVariables, nodeDomain);
}
pv[i] = new StructuralPropertiesVector(pp, localProps);
}
@@ -173,4 +178,12 @@
public boolean expensiveThanMaterialization() {
return false;
}
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 1e97182..55ba9f9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,11 +80,12 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
- IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+ IPhysicalPropertiesVector r =
+ dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
index a49c4b3..d6f4812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
@@ -43,7 +43,7 @@
@Override
public Pair<Boolean, IPartitioningProperty> coordinateRequirements(IPartitioningProperty requirements,
IPartitioningProperty firstDeliveredPartitioning, ILogicalOperator op, IOptimizationContext context) {
- return new Pair<Boolean, IPartitioningProperty>(true, requirements);
+ return new Pair<>(true, requirements);
}
};
@@ -62,9 +62,9 @@
(UnorderedPartitionedProperty) firstDeliveredPartitioning;
Set<LogicalVariable> set1 = upp1.getColumnSet();
UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) rqdpp;
- Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> modifuppreq = new ListSet<>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
- Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> covered = new ListSet<>();
// coordinate from an existing partition property
// (firstDeliveredPartitioning)
@@ -94,16 +94,22 @@
"The number of variables are not equal in both partitioning sides");
}
- UnorderedPartitionedProperty upp2 =
- new UnorderedPartitionedProperty(modifuppreq, rqdpp.getNodeDomain());
- return new Pair<Boolean, IPartitioningProperty>(false, upp2);
+ UnorderedPartitionedProperty upp2;
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdpp;
+ if (rqd.usesPartitionsMap()) {
+ upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+ rqd.getNodeDomain(), rqd.getPartitionsMap());
+ } else {
+ upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+ }
+ return new Pair<>(false, upp2);
}
case ORDERED_PARTITIONED: {
throw new NotImplementedException();
}
}
}
- return new Pair<Boolean, IPartitioningProperty>(true, rqdpp);
+ return new Pair<>(true, rqdpp);
}
};
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 1025e44..fbc97d9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -136,6 +136,9 @@
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
+ if (!ur.samePartitioningScheme(ud)) {
+ return false;
+ }
if (mayExpandProperties) {
return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
} else {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index fa8650c..6d4c389 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -30,10 +31,22 @@
public final class UnorderedPartitionedProperty extends AbstractGroupingProperty implements IPartitioningProperty {
private INodeDomain domain;
+ private final int[][] partitionsMap;
- public UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+ private UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain,
+ int[][] partitionsMap) {
super(partitioningVariables);
this.domain = domain;
+ this.partitionsMap = partitionsMap;
+ }
+
+ public static UnorderedPartitionedProperty of(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+ return new UnorderedPartitionedProperty(partitioningVariables, domain, null);
+ }
+
+ public static UnorderedPartitionedProperty ofPartitionsMap(Set<LogicalVariable> partitioningVariables,
+ INodeDomain domain, int[][] partitionsMap) {
+ return new UnorderedPartitionedProperty(partitioningVariables, domain, partitionsMap);
}
@Override
@@ -46,7 +59,7 @@
List<FunctionalDependency> fds) {
Set<LogicalVariable> normalizedColumnSet =
normalizeAndReduceGroupingColumns(columnSet, equivalenceClasses, fds);
- return new UnorderedPartitionedProperty(normalizedColumnSet, domain);
+ return new UnorderedPartitionedProperty(normalizedColumnSet, domain, partitionsMap);
}
@Override
@@ -79,12 +92,23 @@
applied = true;
}
}
- return applied ? new UnorderedPartitionedProperty(newColumnSet, domain) : this;
+ return applied ? new UnorderedPartitionedProperty(newColumnSet, domain, partitionsMap) : this;
}
@Override
public IPartitioningProperty clonePartitioningProperty() {
- return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain);
+ return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain, partitionsMap);
}
+ public int[][] getPartitionsMap() {
+ return partitionsMap;
+ }
+
+ public boolean usesPartitionsMap() {
+ return partitionsMap != null;
+ }
+
+ public boolean samePartitioningScheme(UnorderedPartitionedProperty another) {
+ return Arrays.deepEquals(partitionsMap, another.partitionsMap);
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 3ae1218..4d32d97 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -654,31 +654,24 @@
private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning,
int childIndex, ILogicalOperator parentOp) {
- IPhysicalOperator hashConnector;
- List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet());
+ UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdPartitioning;
+ List<LogicalVariable> vars = new ArrayList<>(rqd.getColumnSet());
String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE);
if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
- hashConnector = new HashPartitionExchangePOperator(vars, domain);
- return hashConnector;
+ return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
}
List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties();
- boolean propWasSet = false;
- hashConnector = null;
if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue();
Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c);
List<FunctionalDependency> fds = ctx.getFDList(c);
if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
- hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain);
- propWasSet = true;
+ return new HashPartitionMergeExchangePOperator(orderColumns, vars, domain, rqd.getPartitionsMap());
}
}
- if (!propWasSet) {
- hashConnector = new HashPartitionExchangePOperator(vars, domain);
- }
- return hashConnector;
+ return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
}
/**
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
index 1b02ab4..15ba92d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
@@ -94,23 +94,24 @@
// If yes, we use HashMergeExchange; otherwise, we use HashExchange.
SortMergeExchangePOperator sme = (SortMergeExchangePOperator) currentOp.getPhysicalOperator();
HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
- Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> liveVars = new HashSet<>();
VariableUtilities.getLiveVariables(op1, liveVars);
boolean usingHashMergeExchange = true;
for (OrderColumn oc : sme.getSortColumns()) {
if (!liveVars.contains(oc.getColumn())) {
usingHashMergeExchange = false;
+ break;
}
}
if (usingHashMergeExchange) {
// Add sort columns from the SortMergeExchange into a new HashMergeExchange.
- List<OrderColumn> ocList = new ArrayList<OrderColumn>();
+ List<OrderColumn> ocList = new ArrayList<>();
for (OrderColumn oc : sme.getSortColumns()) {
ocList.add(oc);
}
- HashPartitionMergeExchangePOperator hpme =
- new HashPartitionMergeExchangePOperator(ocList, hpe.getHashFields(), hpe.getDomain());
+ HashPartitionMergeExchangePOperator hpme = new HashPartitionMergeExchangePOperator(ocList,
+ hpe.getHashFields(), hpe.getDomain(), hpe.getPartitionsMap());
op1.setPhysicalOperator(hpme);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
index 059d52a..17bf600 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
@@ -75,6 +75,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
index 31a959f..186ef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -23,10 +23,13 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
- public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
- super(hashFields, hashFunctions);
+ public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions,
+ Int2IntMap storagePartition2Compute) {
+ super(hashFields, hashFunctions, storagePartition2Compute);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 52df3b7..c91a0ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -24,14 +24,27 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+
+ private static final long serialVersionUID = 2L;
private final int[] hashFields;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final int[][] partitionsMap;
public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories) {
this.hashFields = hashFields;
this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = null;
+ }
+
+ public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ int[][] partitionsMap) {
+ this.hashFields = hashFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -40,6 +53,16 @@
for (int i = 0; i < hashFunctionFactories.length; ++i) {
hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
}
- return new FieldHashPartitionComputer(hashFields, hashFunctions);
+ if (partitionsMap == null) {
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
+ } else {
+ Int2IntMap storagePartition2Compute = new Int2IntOpenHashMap();
+ for (int i = 0; i < partitionsMap.length; i++) {
+ for (int storagePartition : partitionsMap[i]) {
+ storagePartition2Compute.put(storagePartition, i);
+ }
+ }
+ return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
index 5620a95..e36315c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -28,7 +28,7 @@
private final int numPartitions;
public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
- super(hashFields, hashFunctions);
+ super(hashFields, hashFunctions, null);
this.numPartitions = numPartitions;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
index b09bcb8..cb97d1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -22,14 +22,18 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
class HashPartitioner {
private final int[] hashFields;
private final IBinaryHashFunction[] hashFunctions;
+ private final Int2IntMap storagePartition2Compute;
- public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+ public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, Int2IntMap storagePartition2Compute) {
this.hashFields = hashFields;
this.hashFunctions = hashFunctions;
+ this.storagePartition2Compute = storagePartition2Compute;
}
protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
@@ -50,6 +54,15 @@
if (h < 0) {
h = -(h + 1);
}
- return h % nParts;
+ if (storagePartition2Compute == null) {
+ return h % nParts;
+ } else {
+ int storagePartition = h % storagePartition2Compute.size();
+ int computePartition = storagePartition2Compute.getOrDefault(storagePartition, Integer.MIN_VALUE);
+ if (computePartition < 0 || computePartition >= nParts) {
+ throw new IllegalStateException("couldn't resolve storage partition to compute partition");
+ }
+ return computePartition;
+ }
}
}