[NO ISSUE][COMP] Make memory requirements an operator property
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Refactor how memory requirements are maintained in a query plan
- Introduce LocalMemoryRequirements class. Its instances are held
by each physical operator and could be altered by the optimizer.
- Introduce optimizer rule SetMemoryRequirementsRule that
initializes and configures memory requirements for each operator
Change-Id: I3481ddfe163c6ce786290c540cbd05db16a7f64f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3374
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 98feafa..491911b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -18,9 +18,6 @@
*/
package org.apache.asterix.algebra.operators.physical;
-import java.util.Map;
-
-import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.declared.DataSourceId;
@@ -50,6 +47,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -68,6 +66,11 @@
* inverted-index search.
*/
public class InvertedIndexPOperator extends IndexSearchPOperator {
+
+ // variable memory, min 5 frames
+ // 1 for query + 2 for intermediate results + 1 for final result + 1 for reading an inverted list
+ public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_TEXT_SEARCH;
+
private final boolean isPartitioned;
public InvertedIndexPOperator(IDataSourceIndex<String, DataSourceId> idx, INodeDomain domain,
@@ -86,6 +89,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH);
+ }
+
+ @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
@@ -114,9 +122,7 @@
retainNull = true;
}
// In-memory budget (frame limit) for inverted-index search operations
- CompilerProperties compilerProp = metadataProvider.getApplicationContext().getCompilerProperties();
- Map<String, Object> queryConfig = metadataProvider.getConfig();
- int frameLimit = OptimizationConfUtil.getTextSearchNumFrames(compilerProp, queryConfig, op.getSourceLocation());
+ int frameLimit = localMemoryRequirements.getMemoryBudgetInFrames();
// Build runtime.
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch =
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 6a11abf..531cbf3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -105,6 +105,7 @@
import org.apache.hyracks.algebricks.rewriter.rules.ExtractGbyExpressionsRule;
import org.apache.hyracks.algebricks.rewriter.rules.ExtractGroupByDecorVariablesRule;
import org.apache.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
+import org.apache.hyracks.algebricks.rewriter.rules.HybridToInMemoryHashJoinRule;
import org.apache.hyracks.algebricks.rewriter.rules.InferTypesRule;
import org.apache.hyracks.algebricks.rewriter.rules.InlineAssignIntoAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.InlineSingleReferenceVariablesRule;
@@ -133,6 +134,7 @@
import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
+import org.apache.hyracks.algebricks.rewriter.rules.SetMemoryRequirementsRule;
import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateIsomorphicSubplanRule;
@@ -359,6 +361,9 @@
//Turned off the following rule for now not to change OptimizerTest results.
physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+ physicalRewritesAllLevels.add(new SetMemoryRequirementsRule());
+ // must run after SetMemoryRequirementsRule
+ physicalRewritesAllLevels.add(new HybridToInMemoryHashJoinRule());
physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP,
@@ -403,11 +408,11 @@
.add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
- // Re-infer all types, so that, e.g., the effect of not-is-null is
- // propagated.
+ // Re-infer all types, so that, e.g., the effect of not-is-null is propagated
prepareForJobGenRewrites.add(new ReinferAllTypesRule());
prepareForJobGenRewrites.add(new PushGroupByIntoSortRule());
prepareForJobGenRewrites.add(new SetExecutionModeRule());
+ prepareForJobGenRewrites.add(new SetMemoryRequirementsRule());
prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
return prepareForJobGenRewrites;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
index e64889d..51e536a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
@@ -35,7 +35,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
/**
* If an ORDER operator is followed by LIMIT, then we can push LIMIT into ORDER operator.
@@ -89,7 +88,6 @@
*/
private boolean pushLimitIntoOrder(Mutable<ILogicalOperator> opRef, Mutable<ILogicalOperator> opRef2,
IOptimizationContext context) throws AlgebricksException {
- PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
LimitOperator limitOp = (LimitOperator) opRef.getValue();
OrderOperator orderOp = (OrderOperator) opRef2.getValue();
@@ -106,8 +104,7 @@
// Create the new ORDER operator, set the topK value, and replace the current one.
OrderOperator newOrderOp = new OrderOperator(orderOp.getOrderExpressions(), topK);
newOrderOp.setSourceLocation(orderOp.getSourceLocation());
- newOrderOp.setPhysicalOperator(
- new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), newOrderOp.getTopK()));
+ newOrderOp.setPhysicalOperator(new StableSortPOperator(newOrderOp.getTopK()));
newOrderOp.getInputs().addAll(orderOp.getInputs());
newOrderOp.setExecutionMode(orderOp.getExecutionMode());
newOrderOp.recomputeSchema();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 69eecfd..31de7ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -130,11 +130,7 @@
}
generateMergeAggregationExpressions(gby);
-
- return new ExternalGroupByPOperator(gby.getGroupByVarList(),
- physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
+ return new ExternalGroupByPOperator(gby.getGroupByVarList());
}
private void generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
@@ -252,12 +248,11 @@
boolean nestedTrivialAggregates =
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(),
- frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
- context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates);
} else if (AnalysisUtil.hasFunctionWithProperty(winOp,
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
- return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false,
- context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false,
+ false);
} else {
return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a2c1c33..e8bf46a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -20,11 +20,11 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
public class OperatorResourcesComputer {
@@ -32,21 +32,10 @@
private static final long MAX_BUFFER_PER_CONNECTION = 1L;
private final int numComputationPartitions;
- private final long groupByMemorySize;
- private final long joinMemorySize;
- private final long sortMemorySize;
- private final long windowMemorySize;
- private final long textSearchMemorySize;
private final long frameSize;
- public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
- int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, long frameSize) {
+ public OperatorResourcesComputer(int numComputationPartitions, long frameSize) {
this.numComputationPartitions = numComputationPartitions;
- this.groupByMemorySize = groupFrameLimit * frameSize;
- this.joinMemorySize = joinFrameLimit * frameSize;
- this.sortMemorySize = sortFrameLimit * frameSize;
- this.windowMemorySize = windowFrameLimit * frameSize;
- this.textSearchMemorySize = textSearchFrameLimit * frameSize;
this.frameSize = frameSize;
}
@@ -59,79 +48,25 @@
}
public long getOperatorRequiredMemory(ILogicalOperator operator) {
- switch (operator.getOperatorTag()) {
- case AGGREGATE:
- case ASSIGN:
- case DATASOURCESCAN:
- case DISTINCT:
- case DISTRIBUTE_RESULT:
- case EMPTYTUPLESOURCE:
- case DELEGATE_OPERATOR:
- case EXTERNAL_LOOKUP:
- case LIMIT:
- case MATERIALIZE:
- case NESTEDTUPLESOURCE:
- case PROJECT:
- case REPLICATE:
- case RUNNINGAGGREGATE:
- case SCRIPT:
- case SELECT:
- case SINK:
- case SPLIT:
- case SUBPLAN:
- case TOKENIZE:
- case UNIONALL:
- case UNNEST:
- case LEFT_OUTER_UNNEST:
- case UPDATE:
- case WRITE:
- case WRITE_RESULT:
- case INDEX_INSERT_DELETE_UPSERT:
- case INSERT_DELETE_UPSERT:
- case INTERSECT:
- case FORWARD:
- return getOperatorRequiredMemory(operator, frameSize);
- case LEFT_OUTER_UNNEST_MAP:
- case UNNEST_MAP:
- // Since an inverted-index search requires certain amount of memory, needs to calculate
- // the memory size differently if the given index-search is an inverted-index search.
- long unnestMapMemorySize = frameSize;
- if (isInvertedIndexSearch((AbstractUnnestMapOperator) operator)) {
- unnestMapMemorySize += textSearchMemorySize;
- }
- return getOperatorRequiredMemory(operator, unnestMapMemorySize);
- case EXCHANGE:
- return getExchangeRequiredMemory((ExchangeOperator) operator);
- case GROUP:
- return getOperatorRequiredMemory(operator, groupByMemorySize);
- case ORDER:
- return getOperatorRequiredMemory(operator, sortMemorySize);
- case INNERJOIN:
- case LEFTOUTERJOIN:
- return getOperatorRequiredMemory(operator, joinMemorySize);
- case WINDOW:
- return getWindowRequiredMemory((WindowOperator) operator);
- default:
- throw new IllegalStateException("Unrecognized operator: " + operator.getOperatorTag());
+ if (operator.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+ return getExchangeRequiredMemory((ExchangeOperator) operator);
+ } else {
+ IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator();
+ return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements());
}
}
- private long getOperatorRequiredMemory(ILogicalOperator op, long memorySize) {
- if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
- || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) {
+ if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
return memorySize * numComputationPartitions;
}
return memorySize;
}
- private boolean isInvertedIndexSearch(AbstractUnnestMapOperator op) {
- IPhysicalOperator physicalOperator = op.getPhysicalOperator();
- final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
- if (physicalOperatorTag == PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
- || physicalOperatorTag == PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
- return true;
- }
- return false;
+ private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode,
+ LocalMemoryRequirements memoryReqs) {
+ return getOperatorRequiredMemory(opExecMode, memoryReqs.getMemoryBudgetInBytes(frameSize));
}
private long getExchangeRequiredMemory(ExchangeOperator op) {
@@ -139,16 +74,8 @@
final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
|| physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
- return getOperatorRequiredMemory(op, frameSize);
+ return getOperatorRequiredMemory(op.getExecutionMode(), frameSize);
}
return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
}
-
- private long getWindowRequiredMemory(WindowOperator op) {
- // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
- // streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
- long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
- : windowMemorySize;
- return getOperatorRequiredMemory(op, memorySize);
- }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 0f4c4c0..ba11956 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -55,14 +55,8 @@
AlgebricksAbsolutePartitionConstraint computationLocations,
PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
final int frameSize = physicalOptimizationConfig.getFrameSize();
- final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
- final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
- final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
- final int windowFrameLimit = physicalOptimizationConfig.getMaxFramesForWindow();
- final int textSearchFrameLimit = physicalOptimizationConfig.getMaxFramesForTextSearch();
final List<PlanStage> planStages = getStages(plan);
- return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
- groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
+ return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize);
}
public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -74,15 +68,13 @@
}
public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
- int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit,
int frameSize) {
- final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
- groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
+ final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize);
final IClusterCapacity clusterCapacity = new ClusterCapacity();
- final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
+ final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
.orElseThrow(IllegalStateException::new);
clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory);
- final Integer maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
+ final int maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
.orElseThrow(IllegalStateException::new);
clusterCapacity.setAggregatedCores(maxRequireCores);
return clusterCapacity;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index 094009e..b8beccc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
@@ -51,10 +52,20 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.junit.Assert;
@@ -64,9 +75,7 @@
private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS =
new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER));
- private static final long MEMORY_BUDGET = 33554432L;
private static final int FRAME_SIZE = 32768;
- private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
private static final int PARALLELISM = 10;
private static final long MAX_BUFFER_PER_CONNECTION = 1L;
@@ -74,9 +83,11 @@
public void noBlockingPlan() throws AlgebricksException {
EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
ets.setExecutionMode(UNPARTITIONED);
+ ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
AssignOperator assignOperator = new AssignOperator(Collections.emptyList(), null);
assignOperator.setExecutionMode(UNPARTITIONED);
+ assignOperator.setPhysicalOperator(new AssignPOperator());
assignOperator.getInputs().add(new MutableObject<>(ets));
ExchangeOperator exchange = new ExchangeOperator();
@@ -86,8 +97,9 @@
DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
resultOperator.setExecutionMode(UNPARTITIONED);
+ resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(exchange));
- ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
List<PlanStage> stages = ResourceUtils.getStages(plan);
// ensure a single stage plan
@@ -103,9 +115,11 @@
public void testNonBlockingGroupByOrderBy() throws AlgebricksException {
EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
ets.setExecutionMode(PARTITIONED);
+ ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
scanOperator.setExecutionMode(PARTITIONED);
+ scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
scanOperator.getInputs().add(new MutableObject<>(ets));
ExchangeOperator exchange = new ExchangeOperator();
@@ -115,18 +129,19 @@
GroupByOperator groupByOperator = new GroupByOperator();
groupByOperator.setExecutionMode(PARTITIONED);
- groupByOperator
- .setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT));
+ groupByOperator.setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true));
groupByOperator.getInputs().add(new MutableObject<>(exchange));
OrderOperator orderOperator = new OrderOperator();
orderOperator.setExecutionMode(PARTITIONED);
+ orderOperator.setPhysicalOperator(new StableSortPOperator());
orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(orderOperator));
- ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
final List<PlanStage> stages = ResourceUtils.getStages(plan);
validateStages(stages, ets, exchange, groupByOperator, orderOperator, resultOperator);
@@ -136,8 +151,10 @@
// dominating stage should have orderBy, orderBy's input (groupby), groupby's input (exchange),
// exchange's input (scanOperator), and scanOperator's input (ets)
- long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
- long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long orderOperatorRequiredMemory =
+ AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
+ long groupByOperatorRequiredMemory =
+ AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY * FRAME_SIZE * PARALLELISM;
long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE;
long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE;
long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
@@ -151,20 +168,26 @@
public void testJoinGroupby() throws AlgebricksException {
EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator();
ets1.setExecutionMode(PARTITIONED);
+ ets1.setPhysicalOperator(new EmptyTupleSourcePOperator());
DataSourceScanOperator scanOperator1 = new DataSourceScanOperator(Collections.emptyList(), null);
scanOperator1.setExecutionMode(PARTITIONED);
+ scanOperator1.setPhysicalOperator(new DataSourceScanPOperator(null));
scanOperator1.getInputs().add(new MutableObject<>(ets1));
EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator();
- ets1.setExecutionMode(PARTITIONED);
+ ets2.setExecutionMode(PARTITIONED);
+ ets2.setPhysicalOperator(new EmptyTupleSourcePOperator());
DataSourceScanOperator scanOperator2 = new DataSourceScanOperator(Collections.emptyList(), null);
scanOperator2.setExecutionMode(PARTITIONED);
+ scanOperator2.setPhysicalOperator(new DataSourceScanPOperator(null));
scanOperator2.getInputs().add(new MutableObject<>(ets2));
InnerJoinOperator firstJoin = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
firstJoin.setExecutionMode(PARTITIONED);
+ firstJoin.setPhysicalOperator(new NestedLoopJoinPOperator(firstJoin.getJoinKind(),
+ AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
firstJoin.getInputs().add(new MutableObject<>(scanOperator1));
firstJoin.getInputs().add(new MutableObject<>(scanOperator2));
@@ -174,11 +197,11 @@
exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
- ets1.setExecutionMode(PARTITIONED);
+ ets3.setExecutionMode(PARTITIONED);
+ ets3.setPhysicalOperator(new EmptyTupleSourcePOperator());
GroupByOperator groupByOperator = new GroupByOperator();
- groupByOperator
- .setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT));
+ groupByOperator.setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList()));
groupByOperator.setExecutionMode(LOCAL);
groupByOperator.getInputs().add(new MutableObject<>(ets3));
@@ -189,13 +212,16 @@
LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
secondJoin.setExecutionMode(PARTITIONED);
+ secondJoin.setPhysicalOperator(new NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+ AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(secondJoin));
- ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
List<PlanStage> stages = ResourceUtils.getStages(plan);
final int expectedStages = 4;
@@ -207,9 +233,9 @@
// resultOperator, its input (secondJoin), secondJoin's first input (exchangeOperator1), exchangeOperator1's
// input (firstJoin), firstJoin's first input (scanOperator1), and scanOperator1's input (ets1)
long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
- long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long secondJoinRequiredMemory = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * PARALLELISM * PARALLELISM * FRAME_SIZE;
- long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long firstJoinRequiredMemory = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
long ets1RequiredMemory = FRAME_SIZE * PARALLELISM;
@@ -222,34 +248,40 @@
public void testReplicateSortJoin() throws AlgebricksException {
EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
ets.setExecutionMode(PARTITIONED);
+ ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
scanOperator.setExecutionMode(PARTITIONED);
+ scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
scanOperator.getInputs().add(new MutableObject<>(ets));
ReplicateOperator replicateOperator = new ReplicateOperator(2);
replicateOperator.setExecutionMode(PARTITIONED);
+ replicateOperator.setPhysicalOperator(new ReplicatePOperator());
replicateOperator.getInputs().add(new MutableObject<>(scanOperator));
OrderOperator order1 = new OrderOperator();
order1.setExecutionMode(PARTITIONED);
- order1.setPhysicalOperator(new OneToOneExchangePOperator());
+ order1.setPhysicalOperator(new StableSortPOperator());
order1.getInputs().add(new MutableObject<>(replicateOperator));
OrderOperator order2 = new OrderOperator();
order2.setExecutionMode(PARTITIONED);
- order2.setPhysicalOperator(new OneToOneExchangePOperator());
+ order2.setPhysicalOperator(new StableSortPOperator());
order2.getInputs().add(new MutableObject<>(replicateOperator));
LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
secondJoin.setExecutionMode(PARTITIONED);
+ secondJoin.setPhysicalOperator(new NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+ AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
secondJoin.getInputs().add(new MutableObject<>(order1));
secondJoin.getInputs().add(new MutableObject<>(order2));
DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.setPhysicalOperator(new DistributeResultPOperator());
resultOperator.getInputs().add(new MutableObject<>(secondJoin));
- ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
List<PlanStage> stages = ResourceUtils.getStages(plan);
final int expectedStages = 3;
@@ -257,14 +289,14 @@
validateStages(stages);
// dominating stage should have the following operators:
- // secondJoin, secondJoin's second input (order2), order2's input (replicate),
+ // order1, order2, order1 and order2's input (replicate),
// replicate's input (scanOperator), scanOperator's input (ets)
- long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
- long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long order1RequiredMemory = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
+ long order2RequiredMemory = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
- long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
+ long expectedMemory = order1RequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
+ scanOperator1RequiredMemory + etsRequiredMemory;
assertRequiredMemory(stages, expectedMemory);
}
@@ -300,8 +332,13 @@
}
private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
- final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
- FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+ for (PlanStage stage : stages) {
+ for (ILogicalOperator op : stage.getOperators()) {
+ ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op);
+ }
+ }
+ final IClusterCapacity clusterCapacity =
+ ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE);
Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index e8643cd..b29cb61 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -23,6 +23,10 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -30,13 +34,11 @@
public class OptimizationConfUtil {
- private static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
- private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
- private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
- // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable partition reader)
- private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
- // one for query, two for intermediate results, one for final result, and one for reading an inverted list
- private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5;
+ private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+ private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+ private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+ private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+ public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see InvertedIndexPOperator
private OptimizationConfUtil() {
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a88ec64..a5e22cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -51,6 +52,10 @@
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
throws AlgebricksException;
+ public LocalMemoryRequirements getLocalMemoryRequirements();
+
+ public void createLocalMemoryRequirements(ILogicalOperator op);
+
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException;
@@ -72,10 +77,10 @@
public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op);
/*
- * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize the result.
- * If the subgraph whose result we would like to materialize has an operator that is computationally expensive, we assume
- * it is cheaper to materialize the result of this subgraph and read from the file rather than recomputing it.
+ * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize
+ * the result. If the subgraph whose result we would like to materialize has an operator that is computationally
+ * expensive, we assume it is cheaper to materialize the result of this subgraph and read from the file rather
+ * than recomputing it.
*/
public boolean expensiveThanMaterialization();
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
index ce6dedc..17c60b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -21,17 +21,19 @@
import java.util.List;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
+ // variable memory, min 4 frames
+ public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
+
protected List<LogicalVariable> columnList;
- protected final int framesLimit;
-
- protected AbstractGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+ protected AbstractGroupByPOperator(List<LogicalVariable> columnList) {
this.columnList = columnList;
- this.framesLimit = framesLimit;
}
public List<LogicalVariable> getGroupByColumns() {
@@ -48,6 +50,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ }
+
+ @Override
public String toString() {
return getOperatorTag().toString() + columnList;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index aea9b3e..c300392 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -21,9 +21,13 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
+ // variable memory, min 5 frames
+ public static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
+
public enum JoinPartitioningType {
PAIRWISE,
BROADCAST
@@ -56,4 +60,9 @@
public boolean expensiveThanMaterialization() {
return true;
}
+
+ @Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN);
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 29d6037..96fbe3e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -50,6 +51,7 @@
public abstract class AbstractPhysicalOperator implements IPhysicalOperator {
protected IPhysicalPropertiesVector deliveredProperties;
+ protected LocalMemoryRequirements localMemoryRequirements;
private boolean disableJobGenBelow = false;
private Object hostQueryContext;
@@ -88,6 +90,16 @@
}
@Override
+ public LocalMemoryRequirements getLocalMemoryRequirements() {
+ return localMemoryRequirements;
+ }
+
+ @Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1);
+ }
+
+ @Override
public void disableJobGenBelowMe() {
this.disableJobGenBelow = true;
}
@@ -141,14 +153,13 @@
List<List<AlgebricksPipeline>> subplans = new ArrayList<>(npOp.getNestedPlans().size());
PlanCompiler pc = new PlanCompiler(context);
for (ILogicalPlan p : npOp.getNestedPlans()) {
- subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc));
+ subplans.add(buildPipelineWithProjection(p, outerPlanSchema, opSchema, pc));
}
return subplans;
}
private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
- AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc)
- throws AlgebricksException {
+ IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
if (p.getRoots().size() > 1) {
throw new NotImplementedException("Nested plans with several roots are not supported.");
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 23a411c..fdb7347 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -59,8 +59,8 @@
public abstract class AbstractPreclusteredGroupByPOperator extends AbstractGroupByPOperator {
- protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
- super(columnList, framesLimit);
+ protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
}
// Obs: We don't propagate properties corresponding to decors, since they
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 81852d4..eaec772 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
@@ -49,12 +50,13 @@
public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
- final int maxNumberOfFrames;
+ // variable memory, min 3 frames
+ public static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
+
OrderColumn[] sortColumns;
ILocalStructuralProperty orderProp;
- AbstractStableSortPOperator(int maxNumberOfFrames) {
- this.maxNumberOfFrames = maxNumberOfFrames;
+ AbstractStableSortPOperator() {
}
public OrderColumn[] getSortColumns() {
@@ -163,4 +165,9 @@
&& clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1
&& ctx.getPhysicalOptimizationConfig().getSortParallel();
}
+
+ @Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT);
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 927beae..bf0a824 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -68,11 +68,8 @@
public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
- private final long inputSize;
-
- public ExternalGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, long fileSize) {
- super(columnList, framesLimit);
- this.inputSize = fileSize;
+ public ExternalGroupByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
}
@Override
@@ -238,11 +235,14 @@
JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
// Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
- int memoryBudgetInBytes = context.getFrameSize() * framesLimit;
+ int frameSize = context.getFrameSize();
+ long memoryBudgetInBytes = localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
int groupByColumnsCount = gby.getGroupByList().size() + numFds;
int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
- groupByColumnsCount, context.getFrameSize());
+ groupByColumnsCount, frameSize);
+ int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
+ long inputSize = framesLimit * (long) frameSize;
ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6d8b6e3..8a56fbb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -60,8 +60,6 @@
public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
- // The maximum number of in-memory frames that this hash join can use.
- private final int memSizeInFrames;
private final int maxInputBuildSizeInFrames;
private final double fudgeFactor;
@@ -69,17 +67,15 @@
public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
- int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
+ int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
- this.memSizeInFrames = memSizeInFrames;
this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
this.fudgeFactor = fudgeFactor;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+ partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
- + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames
- + ", int maxInputSize0InFrames=" + maxInputSizeInFrames + ", int aveRecordsPerFrame="
- + aveRecordsPerFrame + ", double fudgeFactor=" + fudgeFactor + ".");
+ + sideRightOfEqualities + ", int maxInputSize0InFrames=" + maxInputSizeInFrames
+ + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor=" + fudgeFactor + ".");
}
}
@@ -97,10 +93,6 @@
return fudgeFactor;
}
- public int getMemSizeInFrames() {
- return memSizeInFrames;
- }
-
@Override
public String toString() {
return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
@@ -156,11 +148,12 @@
IBinaryHashFunctionFamily[] rightHashFunFamilies, IBinaryComparatorFactory[] leftCompFactories,
IBinaryComparatorFactory[] rightCompFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) {
+ int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
switch (kind) {
case INNER:
- return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies,
- rightHashFunFamilies, leftCompFactories, rightCompFactories, recDescriptor,
+ return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
+ getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies,
+ leftCompFactories, rightCompFactories, recDescriptor,
new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory);
case LEFT_OUTER:
@@ -168,9 +161,9 @@
for (int j = 0; j < nonMatchWriterFactories.length; j++) {
nonMatchWriterFactories[j] = context.getMissingWriterFactory();
}
- return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies,
- rightHashFunFamilies, leftCompFactories, rightCompFactories, recDescriptor,
+ return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
+ getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies,
+ leftCompFactories, rightCompFactories, recDescriptor,
new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory,
true, nonMatchWriterFactories);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 152fcc6..90ae4cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -50,19 +50,15 @@
public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
private final int tableSize;
- // The maximum number of in-memory frames that this hash join can use.
- private final int memSizeInFrames;
/**
* builds on the first operator and probes on the second.
*/
public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
- List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize,
- int memSizeInFrames) {
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
this.tableSize = tableSize;
- this.memSizeInFrames = memSizeInFrames;
}
@Override
@@ -104,14 +100,16 @@
IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
- : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+ IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
+ : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc;
+ int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
+
switch (kind) {
case INNER:
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 350bcfb..2d70abe 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -39,8 +39,8 @@
public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
- public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
- super(columnList, framesLimit);
+ public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
}
@Override
@@ -72,6 +72,7 @@
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+ int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, framesLimit);
runtime.setSourceLocation(gby.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 413c1a4..403ca57 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -39,8 +39,7 @@
public class MicroStableSortPOperator extends AbstractStableSortPOperator {
- public MicroStableSortPOperator(int maxNumberOfFrames) {
- super(maxNumberOfFrames);
+ public MicroStableSortPOperator() {
}
@Override
@@ -80,6 +79,7 @@
i++;
}
+ int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames();
IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames);
builder.contributeMicroOperator(op, runtime, recDescriptor);
ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
index 048b129..524b336 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -63,11 +63,8 @@
*/
public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
- private final int memSize;
-
- public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+ public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType) {
super(kind, partitioningType);
- this.memSize = memSize;
}
@Override
@@ -141,6 +138,7 @@
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc;
+ int memSize = localMemoryRequirements.getMemoryBudgetInFrames();
switch (kind) {
case INNER:
opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
@@ -187,7 +185,6 @@
}
public static class TuplePairEvaluator implements ITuplePairComparator {
- private final IHyracksTaskContext ctx;
private IScalarEvaluator condEvaluator;
private final IPointable p;
private final CompositeFrameTupleReference compositeTupleRef;
@@ -197,7 +194,6 @@
public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
IBinaryBooleanInspector binaryBooleanInspector) throws HyracksDataException {
- this.ctx = ctx;
this.condEvaluator = condFactory.createScalarEvaluator(ctx);
this.binaryBooleanInspector = binaryBooleanInspector;
this.leftRef = new FrameTupleReference();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index b6faa36..c6b49c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -44,8 +44,8 @@
private final boolean groupAll;
- public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
- super(columnList, framesLimit);
+ public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll) {
+ super(columnList);
this.groupAll = groupAll;
}
@@ -85,6 +85,7 @@
RecordDescriptor recordDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit);
opDesc.setSourceLocation(gby.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index af8161f..20e197f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -69,8 +69,8 @@
private final OrderColumn[] orderColumns;
- public SortGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, OrderColumn[] orderColumns) {
- super(columnList, framesLimit);
+ public SortGroupByPOperator(List<LogicalVariable> columnList, OrderColumn[] orderColumns) {
+ super(columnList);
this.orderColumns = orderColumns;
}
@@ -249,6 +249,7 @@
normalizedKeyFactory =
orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider.getNormalizedKeyComputerFactory(type, true)
: nkcfProvider.getNormalizedKeyComputerFactory(type, false);
+ int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, framesLimit, keys,
keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
partialAggRecordDescriptor, recordDescriptor, false);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 9567e5b..93c5c3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -49,12 +49,11 @@
private final int topK;
- public StableSortPOperator(int maxNumberOfFrames) {
- this(maxNumberOfFrames, -1);
+ public StableSortPOperator() {
+ this(-1);
}
- public StableSortPOperator(int maxNumberOfFrames, int topK) {
- super(maxNumberOfFrames);
+ public StableSortPOperator(int topK) {
this.topK = topK;
}
@@ -98,6 +97,7 @@
i++;
}
+ int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames();
AbstractSorterOperatorDescriptor sortOpDesc;
// topK == -1 means that a topK value is not provided.
if (topK == -1) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 23853e8..2a0b052 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -21,9 +21,11 @@
import java.util.List;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -38,23 +40,22 @@
public final class WindowPOperator extends AbstractWindowPOperator {
+ // variable memory, min 5 frames:
+ // 1 (output) + 1 (input copy conservative) + 1 (partition writer) + 2 (seekable partition reader)
+ public static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
+
private final boolean frameStartIsMonotonic;
private final boolean frameEndIsMonotonic;
private final boolean nestedTrivialAggregates;
- // The maximum number of in-memory frames that this operator can use.
- private final int memSizeInFrames;
-
public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns,
- boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates,
- int memSizeInFrames) {
+ boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates) {
super(partitionColumns, orderColumns);
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndIsMonotonic = frameEndIsMonotonic;
this.nestedTrivialAggregates = nestedTrivialAggregates;
- this.memSizeInFrames = memSizeInFrames;
}
@Override
@@ -63,6 +64,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_WINDOW);
+ }
+
+ @Override
protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
@@ -74,6 +80,8 @@
IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
+ int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
+
// special cases
if (!winOp.hasNestedPlans()) {
return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
index 33b47ec..68476d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -21,9 +21,11 @@
import java.util.List;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -35,6 +37,9 @@
public final class WindowStreamPOperator extends AbstractWindowPOperator {
+ // fixed memory, 2 frames: 1 (output) + 1 (input copy conservative)
+ public static final int MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM = 2;
+
public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
super(partitionColumns, orderColumns);
}
@@ -45,6 +50,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op) {
+ localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM);
+ }
+
+ @Override
protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
new file mode 100644
index 0000000..38e318c
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.properties;
+
+public abstract class LocalMemoryRequirements {
+
+ public abstract int getMinMemoryBudgetInFrames();
+
+ public abstract int getMemoryBudgetInFrames();
+
+ public abstract void setMemoryBudgetInFrames(int value);
+
+ public final long getMemoryBudgetInBytes(long frameSize) {
+ return frameSize * getMemoryBudgetInFrames();
+ }
+
+ public static LocalMemoryRequirements fixedMemoryBudget(int memBudgetInFrames) {
+ if (memBudgetInFrames < 0) {
+ throw new IllegalArgumentException(String.valueOf(memBudgetInFrames));
+ }
+ return memBudgetInFrames == FixedMemoryBudget.ONE_FRAME.memBudgetInFrames ? FixedMemoryBudget.ONE_FRAME
+ : new FixedMemoryBudget(memBudgetInFrames);
+ }
+
+ private static final class FixedMemoryBudget extends LocalMemoryRequirements {
+
+ private static final FixedMemoryBudget ONE_FRAME = new FixedMemoryBudget(1);
+
+ private final int memBudgetInFrames;
+
+ private FixedMemoryBudget(int memBudgetInFrames) {
+ this.memBudgetInFrames = memBudgetInFrames;
+ }
+
+ @Override
+ public int getMinMemoryBudgetInFrames() {
+ return memBudgetInFrames;
+ }
+
+ @Override
+ public int getMemoryBudgetInFrames() {
+ return memBudgetInFrames;
+ }
+
+ @Override
+ public void setMemoryBudgetInFrames(int value) {
+ if (value != memBudgetInFrames) {
+ throw new IllegalArgumentException("Got " + value + ", expected " + memBudgetInFrames);
+ }
+ }
+ }
+
+ public static LocalMemoryRequirements variableMemoryBudget(int minMemBudgetInFrames) {
+ return new VariableMemoryBudget(minMemBudgetInFrames);
+ }
+
+ private static final class VariableMemoryBudget extends LocalMemoryRequirements {
+
+ private final int minMemBudgetInFrames;
+
+ private int memBudgetInFrames;
+
+ private VariableMemoryBudget(int minMemBudgetInFrames) {
+ this.memBudgetInFrames = this.minMemBudgetInFrames = minMemBudgetInFrames;
+ }
+
+ @Override
+ public int getMinMemoryBudgetInFrames() {
+ return minMemBudgetInFrames;
+ }
+
+ @Override
+ public int getMemoryBudgetInFrames() {
+ return memBudgetInFrames;
+ }
+
+ @Override
+ public void setMemoryBudgetInFrames(int value) {
+ if (value < minMemBudgetInFrames) {
+ throw new IllegalArgumentException("Got " + value + ", expected " + minMemBudgetInFrames + " or more");
+ }
+ memBudgetInFrames = value;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index a011abf..749a63c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -100,7 +100,6 @@
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
@@ -110,7 +109,6 @@
private static final String HASH_MERGE = "hash_merge";
private static final String TRUE_CONSTANT = "true";
- private PhysicalOptimizationConfig physicalOptimizationConfig;
private final FunctionIdentifier rangeMapFunction;
private final FunctionIdentifier localSamplingFun;
private final FunctionIdentifier typePropagatingFun;
@@ -147,7 +145,6 @@
// These are actually logical constraints, so they could be pre-computed
// somewhere else, too.
- physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Optimizing operator " + op.getPhysicalOperator() + ".\n");
}
@@ -213,7 +210,7 @@
boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
// The child index of the child operator to optimize first.
- int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
+ int startChildIndex = getStartChildIndex(op, pr, nestedPlan);
IPartitioningProperty firstDeliveredPartitioning = null;
// Enforce data properties in a top-down manner.
for (j = 0; j < op.getInputs().size(); j++) {
@@ -328,8 +325,7 @@
// Gets the index of a child to start top-down data property enforcement.
// If there is a partitioning-compatible child with the operator in opRef,
// start from this child; otherwise, start from child zero.
- private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
- IOptimizationContext context) throws AlgebricksException {
+ private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan) {
IPhysicalPropertiesVector[] reqdProperties = null;
if (pr != null) {
reqdProperties = pr.getRequiredProperties();
@@ -468,7 +464,7 @@
if (pp == null || pp.getPartitioningType() == PartitioningType.UNPARTITIONED) {
addLocalEnforcers(op, childIndex, diffPropertiesVector.getLocalProperties(), nestedPlan, context);
IPhysicalPropertiesVector deliveredByNewChild =
- ((AbstractLogicalOperator) op.getInputs().get(0).getValue()).getDeliveredPhysicalProperties();
+ op.getInputs().get(0).getValue().getDeliveredPhysicalProperties();
if (!nestedPlan) {
addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
}
@@ -555,9 +551,9 @@
oo.setSourceLocation(sourceLoc);
oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
if (isMicroOp) {
- oo.setPhysicalOperator(new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+ oo.setPhysicalOperator(new MicroStableSortPOperator());
} else {
- oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+ oo.setPhysicalOperator(new StableSortPOperator());
}
oo.getInputs().add(topOp);
context.computeAndSetTypeEnvironmentForOperator(oo);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
new file mode 100644
index 0000000..b1577bf
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+
+/**
+ * Must run after {@link SetMemoryRequirementsRule}
+ */
+public final class HybridToInMemoryHashJoinRule implements IAlgebraicRewriteRule {
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator op = opRef.getValue();
+ if (op.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+ || op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+ AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
+ if (joinOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HYBRID_HASH_JOIN) {
+ return JoinUtils.hybridToInMemHashJoin(joinOp, context);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 4c57f21..8a42c31 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -104,7 +104,6 @@
//replace preclustered gby with sort gby
if (!groupByOperator.isGroupAll()) {
op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByVarList(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
sortPhysicalOperator.getSortColumns()));
}
// remove the stable sort operator
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index e6cdc28..bc853f0 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -174,11 +175,11 @@
protected final IOptimizationContext context;
- protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+ protected final PhysicalOptimizationConfig physConfig;
protected AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
this.context = context;
- this.physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+ this.physConfig = context.getPhysicalOptimizationConfig();
}
@Override
@@ -222,11 +223,9 @@
}
if (topLevelOp) {
- return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll());
} else {
- return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList());
}
}
@@ -236,22 +235,27 @@
if (!hasIntermediateAgg) {
return null;
}
- return new ExternalGroupByPOperator(gby.getGroupByVarList(),
- physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
+ return new ExternalGroupByPOperator(gby.getGroupByVarList());
}
@Override
public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op, Boolean topLevelOp)
throws AlgebricksException {
- JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
- return op.getPhysicalOperator();
+ return visitAbstractBinaryJoinOperator(op, topLevelOp);
}
@Override
public IPhysicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
throws AlgebricksException {
+ return visitAbstractBinaryJoinOperator(op, topLevelOp);
+ }
+
+ protected IPhysicalOperator visitAbstractBinaryJoinOperator(AbstractBinaryJoinOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ if (!topLevelOp) {
+ throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+ op.getOperatorTag().toString() + " (micro)");
+ }
JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
return op.getPhysicalOperator();
}
@@ -270,9 +274,9 @@
public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean topLevelOp) throws AlgebricksException {
ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
if (topLevelOp) {
- return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK());
+ return new StableSortPOperator(oo.getTopK());
} else {
- return new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort());
+ return new MicroStableSortPOperator();
}
}
@@ -470,8 +474,7 @@
}
protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
- return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false,
- context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false);
}
// Physical operators for these operators must have been set already by rules that introduced them
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
new file mode 100644
index 0000000..4cecb07
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+/**
+ * Set memory requirements for all operators as follows:
+ * <ol>
+ * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)}
+ * to initialize each operator's {@link LocalMemoryRequirements} with minimal memory budget required by
+ * that operator</li>
+ * <li>Then increase memory requirements for certain operators as specified by {@link PhysicalOptimizationConfig}</li>
+ * </ol>
+ */
+public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ IPhysicalOperator physOp = op.getPhysicalOperator();
+ if (physOp.getLocalMemoryRequirements() != null) {
+ return false;
+ }
+ computeLocalMemoryRequirements(op, context, createMemoryRequirementsConfigurator(context));
+ return true;
+ }
+
+ private void computeLocalMemoryRequirements(AbstractLogicalOperator op, IOptimizationContext context,
+ ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException {
+ IPhysicalOperator physOp = op.getPhysicalOperator();
+ if (physOp.getLocalMemoryRequirements() == null) {
+ physOp.createLocalMemoryRequirements(op);
+ if (physOp.getLocalMemoryRequirements() == null) {
+ throw new IllegalStateException(physOp.getOperatorTag().toString());
+ }
+ op.accept(memoryRequirementsVisitor, null);
+ }
+ if (op.hasNestedPlans()) {
+ AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan p : nested.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> root : p.getRoots()) {
+ computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), context,
+ createMemoryRequirementsConfigurator(context));
+ }
+ }
+ }
+ for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
+ computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), context,
+ createMemoryRequirementsConfigurator(context));
+ }
+ }
+
+ protected ILogicalOperatorVisitor<Void, Void> createMemoryRequirementsConfigurator(IOptimizationContext context) {
+ return new MemoryRequirementsConfigurator(context);
+ }
+
+ protected static class MemoryRequirementsConfigurator implements ILogicalOperatorVisitor<Void, Void> {
+
+ protected final IOptimizationContext context;
+
+ protected final PhysicalOptimizationConfig physConfig;
+
+ protected MemoryRequirementsConfigurator(IOptimizationContext context) {
+ this.context = context;
+ this.physConfig = context.getPhysicalOptimizationConfig();
+ }
+
+ // helper methods
+
+ protected void setOperatorMemoryBudget(AbstractLogicalOperator op, int memBudgetInFrames)
+ throws AlgebricksException {
+ LocalMemoryRequirements memoryReqs = op.getPhysicalOperator().getLocalMemoryRequirements();
+ int minBudgetInFrames = memoryReqs.getMinMemoryBudgetInFrames();
+ if (memBudgetInFrames < minBudgetInFrames) {
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, op.getSourceLocation(),
+ op.getOperatorTag().toString(), memBudgetInFrames * physConfig.getFrameSize(),
+ minBudgetInFrames * physConfig.getFrameSize());
+ }
+ memoryReqs.setMemoryBudgetInFrames(memBudgetInFrames);
+ }
+
+ // variable memory operators
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ setOperatorMemoryBudget(op, physConfig.getMaxFramesExternalSort());
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ setOperatorMemoryBudget(op, physConfig.getMaxFramesForGroupBy());
+ return null;
+ }
+
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ if (op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW) {
+ setOperatorMemoryBudget(op, physConfig.getMaxFramesForWindow());
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ return visitJoinOperator(op, arg);
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ return visitJoinOperator(op, arg);
+ }
+
+ protected Void visitJoinOperator(AbstractBinaryJoinOperator op, Void arg) throws AlgebricksException {
+ setOperatorMemoryBudget(op, physConfig.getMaxFramesForJoin());
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op, arg);
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg)
+ throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op, arg);
+ }
+
+ protected Void visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Void arg)
+ throws AlgebricksException {
+ IPhysicalOperator physOp = op.getPhysicalOperator();
+ if (physOp.getOperatorTag() == PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
+ || physOp.getOperatorTag() == PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
+ setOperatorMemoryBudget(op, physConfig.getMaxFramesForTextSearch());
+ }
+ return null;
+ }
+
+ // fixed memory operators
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3042548..6687027 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -52,7 +52,7 @@
}
public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean topLevelOp,
- IOptimizationContext context) throws AlgebricksException {
+ IOptimizationContext context) {
if (!topLevelOp) {
throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
}
@@ -87,30 +87,28 @@
}
}
} else {
- setNestedLoopJoinOp(op, context);
+ setNestedLoopJoinOp(op);
}
}
- private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
- op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
- context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
+ private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op) {
+ op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST));
}
private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
- List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
- throws AlgebricksException {
+ List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context) {
op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
- context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
context.getPhysicalOptimizationConfig().getFudgeFactor()));
- if (partitioningType == JoinPartitioningType.BROADCAST) {
- hybridToInMemHashJoin(op, context);
- }
}
- private static void hybridToInMemHashJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
+ public static boolean hybridToInMemHashJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
throws AlgebricksException {
+ HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) op.getPhysicalOperator();
+ if (hhj.getPartitioningType() != JoinPartitioningType.BROADCAST) {
+ return false;
+ }
ILogicalOperator opBuild = op.getInputs().get(1).getValue();
LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);
ILogicalPropertiesVector v = context.getLogicalPropertiesVector(opBuild);
@@ -121,19 +119,19 @@
}
if (v != null) {
int size2 = v.getMaxOutputFrames();
- HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) op.getPhysicalOperator();
- if (size2 > 0 && size2 * hhj.getFudgeFactor() <= hhj.getMemSizeInFrames()) {
+ int hhjMemSizeInFrames = hhj.getLocalMemoryRequirements().getMemoryBudgetInFrames();
+ if (size2 > 0 && size2 * hhj.getFudgeFactor() <= hhjMemSizeInFrames) {
if (loggerTraceEnabled) {
AlgebricksConfig.ALGEBRICKS_LOGGER
.trace("// HybridHashJoin inner branch " + opBuild.getOperatorTag() + " fits in memory\n");
}
// maintains the local properties on the probe side
- op.setPhysicalOperator(
- new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(), hhj.getKeysLeftBranch(),
- hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2, hhj.getMemSizeInFrames()));
+ op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
+ hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+ return true;
}
}
-
+ return false;
}
private static boolean isHashJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll,