ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes.
Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/718
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 0d899b9..1d3a55c 100644
--- a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.compiler.api;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -51,11 +52,11 @@
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig) {
+ PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor();
return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
- physicalOptimizationConfig, prettyPrintVisitor);
+ physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
}
}
@@ -77,7 +78,7 @@
int varCounter) {
final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
- nullableTypeComputer, physicalOptimizationConfig);
+ nullableTypeComputer, physicalOptimizationConfig, clusterLocations);
oc.setMetadataDeclarations(metadata);
final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
return new ICompiler() {
@@ -92,13 +93,13 @@
IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
JobGenContext context = new JobGenContext(null, metadata, appContext,
- serializerDeserializerProvider, hashFunctionFactoryProvider,
- hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
- binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
- nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
- expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer, predEvaluatorFactoryProvider,
- physicalOptimizationConfig.getFrameSize(), clusterLocations);
+ serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+ comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+ binaryIntegerInspectorFactory, printerProvider, nullWriterFactory,
+ normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer,
+ nullableTypeComputer, oc, expressionEvalSizeComputer, partialAggregationTypeComputer,
+ predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(),
+ clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null, jobEventListenerFactory);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 9d68fae..707a7db 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -66,7 +65,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException;
+ throws AlgebricksException;
// variables
@@ -88,7 +87,8 @@
* @return for each child, one vector of required physical properties
*/
- public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties);
+ public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties,
+ IOptimizationContext context);
/**
* @return the physical properties that this operator delivers, based on
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 6badac7..328697d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
@@ -35,52 +36,54 @@
public interface IOptimizationContext extends ITypingContext, IVariableContext {
@Override
- public abstract IMetadataProvider<?, ?> getMetadataProvider();
+ public IMetadataProvider<?, ?> getMetadataProvider();
- public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
+ public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
- public abstract boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+ public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
- public abstract void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+ public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
/*
* returns true if op1 and op2 have already been compared
*/
- public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+ public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
- public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
+ public void removeFromAlreadyCompared(ILogicalOperator op1);
- public abstract void addNotToBeInlinedVar(LogicalVariable var);
+ public void addNotToBeInlinedVar(LogicalVariable var);
- public abstract boolean shouldNotBeInlined(LogicalVariable var);
+ public boolean shouldNotBeInlined(LogicalVariable var);
- public abstract void addPrimaryKey(FunctionalDependency pk);
+ public void addPrimaryKey(FunctionalDependency pk);
- public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var);
+ public List<LogicalVariable> findPrimaryKey(LogicalVariable var);
- public abstract void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
+ public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
- public abstract Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
+ public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
- public abstract void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
+ public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
- public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
+ public List<FunctionalDependency> getFDList(ILogicalOperator op);
public void clearAllFDAndEquivalenceClasses();
- public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
+ public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
- public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
+ public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
- public abstract IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
- public abstract IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
+ public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
- public abstract IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
+ public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
- public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig();
+ public PhysicalOptimizationConfig getPhysicalOptimizationConfig();
- public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
+ public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
- public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+ public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+
+ public INodeDomain getComputationNodeDomain();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a3ef5d9..8c0ab2f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -35,10 +35,12 @@
* @param reqdByParent
* parent's requirements, which are not enforced for now, as we
* only explore one plan
+ * @param context
+ * the optimization context
* @return for each child, one vector of required physical properties
*/
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent);
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context);
/**
* @return the physical properties that this operator delivers, based on
@@ -51,7 +53,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException;
+ throws AlgebricksException;
public void disableJobGenBelowMe();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 5e9eef6..2410ca0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -102,8 +102,8 @@
@Override
public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
- IPhysicalPropertiesVector requiredProperties) {
- return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties);
+ IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) {
+ return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context);
}
/**
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 09d2253..7077014 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -86,7 +86,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
// In a cost-based optimizer, we would also try to propagate the
// parent's partitioning requirements.
@@ -97,12 +97,14 @@
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
switch (partitioningType) {
case PAIRWISE: {
- pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null);
- pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null);
+ pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch),
+ context.getComputationNodeDomain());
+ pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch),
+ context.getComputationNodeDomain());
break;
}
case BROADCAST: {
- pp2 = new BroadcastPartitioningProperty(null);
+ pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
break;
}
default: {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 2c93cd4..2d00e4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -138,7 +138,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
List<ILocalStructuralProperty> localProps = null;
@@ -233,7 +233,8 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+ pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+ context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 2ab24d2..5159ac5 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -26,7 +27,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 4b13645..3759956 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -69,7 +69,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
if (orderProp == null) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 2a2eb1e..3242fa0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -72,7 +72,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
AggregateOperator aggOp = (AggregateOperator) op;
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 5b6c40a..5aed63e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -63,7 +63,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index 03a8666..69bfe2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -61,7 +61,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 8ac3271..dda5456 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,7 +72,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 5823bc0..b3e8385 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -68,7 +68,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
DistributeResultOperator write = (DistributeResultOperator) op;
IDataSink sink = write.getDataSink();
IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 4e3a5bc..dcb7e15 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -56,7 +56,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index b15ea0b..7772620 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -135,12 +135,12 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
- pv[0] = new StructuralPropertiesVector(
- new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnSet), null), null);
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
+ new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
@@ -206,13 +206,16 @@
}
List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList())
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
keyAndDecVariables.add(p.first);
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList())
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+ }
- for (LogicalVariable var : keyAndDecVariables)
+ for (LogicalVariable var : keyAndDecVariables) {
aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+ }
compileSubplans(inputSchemas[0], gby, opSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -236,10 +239,12 @@
for (Object type : intermediateTypes) {
aggOpInputEnv.setVarType(usedVars.get(i++), type);
}
- for (LogicalVariable keyVar : keyAndDecVariables)
+ for (LogicalVariable keyVar : keyAndDecVariables) {
localInputSchemas[0].addVariable(keyVar);
- for (LogicalVariable usedVar : usedVars)
+ }
+ for (LogicalVariable usedVar : usedVars) {
localInputSchemas[0].addVariable(usedVar);
+ }
for (i = 0; i < n; i++) {
AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
.get(i).getValue();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..34c707b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -76,7 +76,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 21be272..17322b6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -100,7 +100,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
List<OrderColumn> columns = new ArrayList<OrderColumn>();
for (OrderColumn oc : orderColumns) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index b837bfa..50ab6aa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -83,7 +83,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 4702361..f29fd6f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -87,7 +87,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(primaryKeys);
scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 2e4b647..f0bd603 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,7 +80,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index b6d0f1f..5f43c3e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -21,7 +21,9 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
@@ -40,8 +42,8 @@
import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
@@ -61,7 +63,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
IntersectOperator intersectOp = (IntersectOperator) iop;
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
for (int i = 0; i < intersectOp.getNumInput(); i++) {
@@ -73,7 +75,8 @@
localProps.add(new LocalOrderProperty(orderColumns));
IPartitioningProperty pp = null;
if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- pp = new RandomPartitioningProperty(null);
+ Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputVariables(i));
+ pp = new UnorderedPartitionedProperty(partitioningVariables, null);
}
pv[i] = new StructuralPropertiesVector(pp, localProps);
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
index a154d91..c55a4ae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -55,7 +55,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 7ff15d7..2622ae3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -113,13 +113,14 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
if (partitioningType != JoinPartitioningType.BROADCAST) {
throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
}
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
- pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+ null);
pv[1] = new StructuralPropertiesVector(null, null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@@ -225,10 +226,11 @@
}
boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
p.getLength());
- if (result)
+ if (result) {
return 0;
- else
+ } else {
return 1;
+ }
}
}
@@ -256,28 +258,31 @@
@Override
public byte[] getFieldData(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldData(fIdx);
- else
+ } else {
return refRight.getFieldData(fIdx - leftFieldCount);
+ }
}
@Override
public int getFieldStart(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldStart(fIdx);
- else
+ } else {
return refRight.getFieldStart(fIdx - leftFieldCount);
+ }
}
@Override
public int getFieldLength(int fIdx) {
int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount)
+ if (fIdx < leftFieldCount) {
return refLeft.getFieldLength(fIdx);
- else
+ } else {
return refRight.getFieldLength(fIdx - leftFieldCount);
+ }
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7535c82..e8cc05f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -89,7 +89,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 27a5357..818e1ec 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -51,7 +51,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index df65906..fe11f64 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -78,7 +78,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
@@ -89,7 +89,8 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+ pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+ context.getComputationNodeDomain());
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -98,7 +99,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
@@ -119,11 +120,11 @@
keysAndDecs[i + keys.length] = fdColumns[i];
}
- IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
- columnList, context.getTypeEnvironment(op), context);
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
+ .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context);
IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
- IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
- aggFactories, keysAndDecs);
+ IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories,
+ keysAndDecs);
RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
index bfe8b36..e11a64f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -51,7 +51,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index af0087d..7237d24 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -85,7 +85,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
index d47c31f..1f70f2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -105,7 +105,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
List<OrderColumn> columns = new ArrayList<OrderColumn>();
for (OrderColumn oc : partitioningFields) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index dca3d97..9046ce2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -85,7 +85,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 19fb5d4..14a8f16 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -50,7 +50,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 2c842e3..8e4ca18 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -63,7 +63,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 6246ad2..71acecf 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -75,7 +75,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements(op.getInputs().size());
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 0b100ee..35f9444 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -67,7 +67,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
WriteOperator write = (WriteOperator) op;
IDataSink sink = write.getDataSink();
IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index 7465711..c08ff85 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -138,7 +138,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index f264284..81f6e6b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -121,7 +121,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index f47e671..99be356 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -71,7 +71,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 326172f..184cbbc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -49,7 +49,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index 2c40b3b..9b35922 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -54,7 +54,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index bd46230..1f5159d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -49,7 +49,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index aa5672e..9fc0dd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -77,7 +77,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index 98427fe..557a657 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -66,7 +66,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 2c407b7..e6517d0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -64,7 +64,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 },
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 9c8ddc3..7ec1914 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,7 +80,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 908dcc1..719c70e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,26 +18,53 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+
public class DefaultNodeGroupDomain implements INodeDomain {
- private String groupName;
+ private List<String> nodes = new ArrayList<>();
- public DefaultNodeGroupDomain(String groupName) {
- this.groupName = groupName;
+ public DefaultNodeGroupDomain(List<String> nodes) {
+ this.nodes.addAll(nodes);
+ }
+
+ public DefaultNodeGroupDomain(DefaultNodeGroupDomain domain) {
+ this.nodes.addAll(domain.nodes);
+ }
+
+ public DefaultNodeGroupDomain(AlgebricksPartitionConstraint clusterLocations) {
+ if (clusterLocations.getPartitionConstraintType() == PartitionConstraintType.ABSOLUTE) {
+ AlgebricksAbsolutePartitionConstraint absPc = (AlgebricksAbsolutePartitionConstraint) clusterLocations;
+ String[] locations = absPc.getLocations();
+ for (String location : locations) {
+ nodes.add(location);
+ }
+ } else {
+ throw new IllegalStateException("A node domain can only take absolute location constraints.");
+ }
}
@Override
public boolean sameAs(INodeDomain domain) {
- return true;
+ if (!(domain instanceof DefaultNodeGroupDomain)) {
+ return false;
+ }
+ DefaultNodeGroupDomain nodeDomain = (DefaultNodeGroupDomain) domain;
+ return nodes.equals(nodeDomain.nodes);
}
@Override
public String toString() {
- return "AsterixDomain(" + groupName + ")";
+ return nodes.toString();
}
@Override
public Integer cardinality() {
- return null;
+ return nodes.size();
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
index d872491..e298691 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
@@ -31,7 +31,6 @@
public List<ILocalStructuralProperty> getLocalProperties();
/**
- *
* @param reqd
* vector of required properties
* @param equivalenceClasses
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 5808da1..44df740 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -83,8 +83,8 @@
@Override
public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
- for (OrderColumn orderColumn : orderColumns){
- if (varMap.containsKey(orderColumn.getColumn())){
+ for (OrderColumn orderColumn : orderColumns) {
+ if (varMap.containsKey(orderColumn.getColumn())) {
orderColumn.setColumn(varMap.get(orderColumn.getColumn()));
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 711c0e7..0b8d759 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -55,9 +55,8 @@
return k;
}
- public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd,
- List<ILocalStructuralProperty> dlvd, Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
- List<FunctionalDependency> fds) {
+ public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd, List<ILocalStructuralProperty> dlvd,
+ Map<LogicalVariable, EquivalenceClass> equivalenceClasses, List<FunctionalDependency> fds) {
if (reqd == null) {
return true;
}
@@ -118,7 +117,7 @@
boolean mayExpandProperties) {
INodeDomain dom1 = reqd.getNodeDomain();
INodeDomain dom2 = dlvd.getNodeDomain();
- if (dom1 != null && dom2 != null && !dom1.sameAs(dom2)) {
+ if (!dom1.sameAs(dom2)) {
return false;
}
@@ -132,10 +131,11 @@
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
- if (mayExpandProperties)
+ if (mayExpandProperties) {
return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
- else
+ } else {
return (ud.getColumnSet().equals(ur.getColumnSet()));
+ }
}
case ORDERED_PARTITIONED: {
UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
index 7ae7e62..2cad1cb 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
@@ -33,7 +33,8 @@
public static final StructuralPropertiesVector EMPTY_PROPERTIES_VECTOR = new StructuralPropertiesVector(null,
new ArrayList<ILocalStructuralProperty>());
- public StructuralPropertiesVector(IPartitioningProperty propPartitioning, List<ILocalStructuralProperty> propsLocal) {
+ public StructuralPropertiesVector(IPartitioningProperty propPartitioning,
+ List<ILocalStructuralProperty> propsLocal) {
this.propPartitioning = propPartitioning;
this.propsLocal = propsLocal;
}
@@ -63,7 +64,6 @@
}
/**
- *
* @param reqd
* vector of required properties
* @return a vector of properties from pvector that are not delivered by the
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index 17e0cb3..d6a364a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -67,8 +67,8 @@
@Override
public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
- for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){
- if (columnSet.remove(var.getKey())){
+ for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()) {
+ if (columnSet.remove(var.getKey())) {
columnSet.add(var.getValue());
}
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index ac2ae5c..4671a71 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -228,19 +228,13 @@
}
}
if (partitionConstraintMap.get(opDesc) == null) {
- if (opConstraint == null) {
- if (parentOp != null) {
- AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
- if (pc != null) {
- opConstraint = pc;
- } else if ((opInputs == null || opInputs.size() == 0) && finalPass) {
- opConstraint = new AlgebricksCountPartitionConstraint(1);
- }
- }
- if (opConstraint == null && finalPass) {
- opConstraint = clusterLocations;
- }
+ if (finalPass && opConstraint == null && (opInputs == null || opInputs.size() == 0)) {
+ opConstraint = new AlgebricksCountPartitionConstraint(1);
}
+ if (finalPass && opConstraint == null) {
+ opConstraint = clusterLocations;
+ }
+ // Sets up the location constraint.
if (opConstraint != null) {
partitionConstraintMap.put(opDesc, opConstraint);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index e76b486..154f4a1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,8 +38,10 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class AlgebricksOptimizationContext implements IOptimizationContext {
@@ -77,20 +80,22 @@
protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
private final IExpressionTypeComputer expressionTypeComputer;
private final INullableTypeComputer nullableTypeComputer;
+ private final INodeDomain defaultNodeDomain;
private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig) {
+ PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
- nullableTypeComputer, physicalOptimizationConfig, new LogicalOperatorPrettyPrintVisitor());
+ nullableTypeComputer, physicalOptimizationConfig, clusterLocations,
+ new LogicalOperatorPrettyPrintVisitor());
}
public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig,
+ PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations,
LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
this.varCounter = varCounter;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
@@ -98,6 +103,7 @@
this.expressionTypeComputer = expressionTypeComputer;
this.nullableTypeComputer = nullableTypeComputer;
this.physicalOptimizationConfig = physicalOptimizationConfig;
+ this.defaultNodeDomain = new DefaultNodeGroupDomain(clusterLocations);
this.prettyPrintVisitor = prettyPrintVisitor;
}
@@ -316,6 +322,11 @@
}
@Override
+ public INodeDomain getComputationNodeDomain() {
+ return defaultNodeDomain;
+ }
+
+ @Override
public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
return prettyPrintVisitor;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 996b988..ce77942 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.rewriter.base;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
@@ -29,5 +30,5 @@
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
- PhysicalOptimizationConfig physicalOptimizationConfig);
+ PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations);
}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index ae4f628..b5751fa 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -29,7 +29,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
@@ -145,13 +145,14 @@
}
});
builder.setTypeTraitProvider(new ITypeTraitProvider() {
+ @Override
public ITypeTraits getTypeTrait(Object type) {
return null;
}
});
builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE);
- builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
- new PigletExpressionJobGen()));
+ builder.setExpressionRuntimeProvider(
+ new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(new PigletExpressionJobGen()));
builder.setExpressionTypeComputer(new IExpressionTypeComputer() {
@Override
public Object getType(ILogicalExpression expr, IMetadataProvider<?, ?> metadataProvider,
@@ -159,6 +160,7 @@
return null;
}
});
+ builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(new String[] { "nc1", "nc2" }));
cFactory = builder.create();
metadataProvider = new PigletMetadataProvider();
}
@@ -237,9 +239,8 @@
}
PigletFileDataSource ds = new PigletFileDataSource(file, types.toArray());
rel.op = new DataSourceScanOperator(variables, ds);
- rel.op.getInputs().add(
- new MutableObject<ILogicalOperator>(previousOp == null ? new EmptyTupleSourceOperator()
- : previousOp));
+ rel.op.getInputs().add(new MutableObject<ILogicalOperator>(
+ previousOp == null ? new EmptyTupleSourceOperator() : previousOp));
return rel;
}
@@ -250,8 +251,9 @@
Relation inputRel = findInputRelation(alias, symMap);
Pair<Relation, LogicalVariable> tempInput = translateScalarExpression(inputRel, conditionNode);
Relation rel = new Relation();
- rel.op = new SelectOperator(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
- tempInput.second)), false, null);
+ rel.op = new SelectOperator(
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(tempInput.second)), false,
+ null);
rel.op.getInputs().add(new MutableObject<ILogicalOperator>(tempInput.first.op));
rel.schema.putAll(tempInput.first.schema);
return rel;
@@ -298,7 +300,8 @@
for (ASTNode a : arguments) {
Pair<Relation, LogicalVariable> argPair = translateScalarExpression(rel, (ExpressionNode) a);
rel = argPair.first;
- argExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second)));
+ argExprs.add(
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second)));
}
Relation outRel = new Relation();
outRel.schema.putAll(rel.schema);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 8bf1ad5..53f7dbf 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -66,7 +66,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
@@ -93,8 +92,6 @@
public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
- private static final INodeDomain DEFAULT_DOMAIN = new DefaultNodeGroupDomain("__DEFAULT");
-
private PhysicalOptimizationConfig physicalOptimizationConfig;
@Override
@@ -127,7 +124,8 @@
PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context);
- StructuralPropertiesVector pvector = new StructuralPropertiesVector(new RandomPartitioningProperty(null),
+ StructuralPropertiesVector pvector = new StructuralPropertiesVector(
+ new RandomPartitioningProperty(context.getComputationNodeDomain()),
new LinkedList<ILocalStructuralProperty>());
boolean changed = physOptimizeOp(opRef, pvector, false, context);
op.computeDeliveredPhysicalProperties(context);
@@ -196,7 +194,7 @@
boolean changed = false;
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
optimizeUsingConstraintsAndEquivClasses(op);
- PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required);
+ PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
IPhysicalPropertiesVector[] reqdProperties = null;
if (pr != null) {
reqdProperties = pr.getRequiredProperties();
@@ -220,7 +218,7 @@
} else {
INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
if (!childrenDomain.sameAs(dom2)) {
- childrenDomain = DEFAULT_DOMAIN;
+ childrenDomain = context.getComputationNodeDomain();
}
}
j++;
@@ -443,7 +441,7 @@
.getDeliveredPhysicalProperties();
addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
} else {
- addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, domain, context);
+ addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, pp.getNodeDomain(), context);
AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, required, true, context);
AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");