[ASTERIXDB-2263][RT] Use Plan Stages To Estimate Resources
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce PlanStagesGenerator that generates
plan stages using blocking/two-phased operators.
- Introduce OperatorResourcesComputer that
calculates the estimated resources for any
logical operator.
- Estimate jobs required resources based on
the stage that requires most resources in
the plan.
Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2299
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
new file mode 100644
index 0000000..7eaaa3d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+
+public class OperatorResourcesComputer {
+
+ public static final int MIN_OPERATOR_CORES = 1;
+ private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+ private final int numComputationPartitions;
+ private final long groupByMemorySize;
+ private final long joinMemorySize;
+ private final long sortMemorySize;
+ private final long frameSize;
+
+ public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
+ int joinFrameLimit, long frameSize) {
+ this.numComputationPartitions = numComputationPartitions;
+ this.groupByMemorySize = groupFrameLimit * frameSize;
+ this.joinMemorySize = joinFrameLimit * frameSize;
+ this.sortMemorySize = sortFrameLimit * frameSize;
+ this.frameSize = frameSize;
+ }
+
+ public int getOperatorRequiredCores(ILogicalOperator operator) {
+ if (operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ return numComputationPartitions;
+ }
+ return MIN_OPERATOR_CORES;
+ }
+
+ public long getOperatorRequiredMemory(ILogicalOperator operator) {
+ switch (operator.getOperatorTag()) {
+ case AGGREGATE:
+ case ASSIGN:
+ case DATASOURCESCAN:
+ case DISTINCT:
+ case DISTRIBUTE_RESULT:
+ case EMPTYTUPLESOURCE:
+ case DELEGATE_OPERATOR:
+ case EXTERNAL_LOOKUP:
+ case LEFT_OUTER_UNNEST_MAP:
+ case LIMIT:
+ case MATERIALIZE:
+ case NESTEDTUPLESOURCE:
+ case PROJECT:
+ case REPLICATE:
+ case RUNNINGAGGREGATE:
+ case SCRIPT:
+ case SELECT:
+ case SINK:
+ case SPLIT:
+ case SUBPLAN:
+ case TOKENIZE:
+ case UNIONALL:
+ case UNNEST:
+ case LEFT_OUTER_UNNEST:
+ case UNNEST_MAP:
+ case UPDATE:
+ case WRITE:
+ case WRITE_RESULT:
+ case INDEX_INSERT_DELETE_UPSERT:
+ case INSERT_DELETE_UPSERT:
+ case INTERSECT:
+ return getOperatorRequiredMemory(operator, frameSize);
+ case EXCHANGE:
+ return getExchangeRequiredMemory((ExchangeOperator) operator);
+ case GROUP:
+ return getOperatorRequiredMemory(operator, groupByMemorySize);
+ case ORDER:
+ return getOperatorRequiredMemory(operator, sortMemorySize);
+ case INNERJOIN:
+ case LEFTOUTERJOIN:
+ return getOperatorRequiredMemory(operator, joinMemorySize);
+ default:
+ throw new IllegalStateException("Unrecognized operator: " + operator.getOperatorTag());
+ }
+ }
+
+ private long getOperatorRequiredMemory(ILogicalOperator op, long memorySize) {
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+ || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+ return memorySize * numComputationPartitions;
+ }
+ return memorySize;
+ }
+
+ private long getExchangeRequiredMemory(ExchangeOperator op) {
+ final IPhysicalOperator physicalOperator = op.getPhysicalOperator();
+ final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
+ if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
+ || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+ return getOperatorRequiredMemory(op, frameSize);
+ }
+ return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java
new file mode 100644
index 0000000..1e623c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+public class PlanStage {
+
+ private final Set<ILogicalOperator> operators = new HashSet<>();
+ private final int stageId;
+
+ PlanStage(int stageId) {
+ this.stageId = stageId;
+ }
+
+ @Override
+ public String toString() {
+ return "Stage{" + "stageId=" + stageId + ", operators(" + operators.size() + ")" + "=" + operators + '}';
+ }
+
+ public Set<ILogicalOperator> getOperators() {
+ return operators;
+ }
+
+ public long getRequiredMemory(OperatorResourcesComputer resourcesComputer) {
+ return operators.stream().mapToLong(resourcesComputer::getOperatorRequiredMemory).sum();
+ }
+
+ public int getRequiredCores(OperatorResourcesComputer resourcesComputer) {
+ return operators.stream().mapToInt(resourcesComputer::getOperatorRequiredCores).max()
+ .orElse(OperatorResourcesComputer.MIN_OPERATOR_CORES);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
new file mode 100644
index 0000000..8b32375
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.resource;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class PlanStagesGenerator implements ILogicalOperatorVisitor<Void, Void> {
+
+ private static final int JOIN_FIRST_INPUT = 1;
+ private static final int JOIN_SECOND_INPUT = 2;
+ private final Set<ILogicalOperator> visitedOperators = new HashSet<>();
+ private final LinkedList<ILogicalOperator> pendingBlockingOperators = new LinkedList<>();
+ private final List<PlanStage> stages = new ArrayList<>();
+ private PlanStage currentStage;
+ private int stageCounter;
+
+ public PlanStagesGenerator() {
+ currentStage = new PlanStage(++stageCounter);
+ stages.add(currentStage);
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ // Makes sure that the downstream of a replicate operator is only visited once.
+ if (!visitedOperators.contains(op)) {
+ visitedOperators.add(op);
+ visit(op);
+ } else {
+ merge(op);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ // Makes sure that the downstream of a split operator is only visited once.
+ if (!visitedOperators.contains(op)) {
+ visitedOperators.add(op);
+ visit(op);
+ } else {
+ merge(op);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ visit(op);
+ return null;
+ }
+
+ public List<PlanStage> getStages() {
+ return stages;
+ }
+
+ private void visit(ILogicalOperator op) throws AlgebricksException {
+ addToStage(op);
+ if (!pendingBlockingOperators.isEmpty()) {
+ final ILogicalOperator firstPending = pendingBlockingOperators.pop();
+ visitBlocking(firstPending);
+ }
+ }
+
+ private void visitBlocking(ILogicalOperator blockingOp) throws AlgebricksException {
+ final PlanStage blockingOpStage = new PlanStage(++stageCounter);
+ blockingOpStage.getOperators().add(blockingOp);
+ stages.add(blockingOpStage);
+ currentStage = blockingOpStage;
+ switch (blockingOp.getOperatorTag()) {
+ case INNERJOIN:
+ case LEFTOUTERJOIN:
+ // visit only the second input
+ ILogicalOperator joinSecondInput = getJoinOperatorInput(blockingOp, JOIN_SECOND_INPUT);
+ joinSecondInput.accept(this, null);
+ break;
+ case GROUP:
+ case ORDER:
+ visitInputs(blockingOp);
+ break;
+ default:
+ throw new IllegalStateException("Unrecognized blocking operator: " + blockingOp.getOperatorTag());
+ }
+ }
+
+ private void addToStage(ILogicalOperator op) throws AlgebricksException {
+ currentStage.getOperators().add(op);
+ switch (op.getOperatorTag()) {
+ case INNERJOIN:
+ case LEFTOUTERJOIN:
+ pendingBlockingOperators.add(op);
+ // continue on the same stage
+ final ILogicalOperator joinFirstInput = getJoinOperatorInput(op, JOIN_FIRST_INPUT);
+ joinFirstInput.accept(this, null);
+ break;
+ case GROUP:
+ if (isBlockingGroupBy((GroupByOperator) op)) {
+ pendingBlockingOperators.add(op);
+ return;
+ }
+ // continue on the same stage
+ visitInputs(op);
+ break;
+ case ORDER:
+ pendingBlockingOperators.add(op);
+ break;
+ default:
+ visitInputs(op);
+ break;
+ }
+ }
+
+ private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+ if (isMaterialized(op)) {
+ // don't visit the inputs of this operator since it is supposed to be blocking due to materialization.
+ // some other non-blocking operator will visit those inputs when reached.
+ return;
+ }
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ inputOpRef.getValue().accept(this, null);
+ }
+ }
+
+ private boolean isBlockingGroupBy(GroupByOperator op) {
+ return op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY
+ || op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY;
+ }
+
+ /**
+ * Checks whether the operator {@code op} is supposed to be materialized
+ * due to a replicate/split operators.
+ *
+ * @param op
+ * @return true if the operator will be materialized. Otherwise false
+ */
+ private boolean isMaterialized(ILogicalOperator op) {
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ final ILogicalOperator inputOp = inputOpRef.getValue();
+ final LogicalOperatorTag inputOpTag = inputOp.getOperatorTag();
+ if (inputOpTag == LogicalOperatorTag.REPLICATE || inputOpTag == LogicalOperatorTag.SPLIT) {
+ final AbstractReplicateOperator replicateOperator = (AbstractReplicateOperator) inputOp;
+ if (replicateOperator.isMaterialized(op)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private ILogicalOperator getJoinOperatorInput(ILogicalOperator op, int inputNum) {
+ if (inputNum != JOIN_FIRST_INPUT && inputNum != JOIN_SECOND_INPUT) {
+ throw new IllegalArgumentException("invalid input number for join operator");
+ }
+ final List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+ if (inputs.size() != 2) {
+ throw new IllegalStateException("Join must have exactly two inputs. Current inputs: " + inputs.size());
+ }
+ return op.getInputs().get(inputNum - 1).getValue();
+ }
+
+ /**
+ * Merges all operators on the current stage to the stage on which {@code op} appeared.
+ *
+ * @param op
+ */
+ private void merge(ILogicalOperator op) {
+ // all operators in this stage belong to the stage of the already visited op
+ for (PlanStage stage : stages) {
+ if (stage != currentStage && stage.getOperators().contains(op)) {
+ stage.getOperators().addAll(currentStage.getOperators());
+ stages.remove(currentStage);
+ currentStage = stage;
+ break;
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index ccda1e7..0149ffa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -19,7 +19,11 @@
package org.apache.asterix.utils;
-import org.apache.asterix.app.resource.RequiredCapacityVisitor;
+import java.util.List;
+
+import org.apache.asterix.app.resource.OperatorResourcesComputer;
+import org.apache.asterix.app.resource.PlanStage;
+import org.apache.asterix.app.resource.PlanStagesGenerator;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -54,16 +58,30 @@
final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
-
- // Creates a cluster capacity visitor.
- IClusterCapacity clusterCapacity = new ClusterCapacity();
- RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
- sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity);
-
- // There could be only one root operator for a top-level query plan.
- ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
- rootOp.accept(visitor, null);
- return clusterCapacity;
+ final List<PlanStage> planStages = getStages(plan);
+ return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
+ groupFrameLimit, joinFrameLimit, frameSize);
}
+ public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
+ // There could be only one root operator for a top-level query plan.
+ final ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
+ final PlanStagesGenerator stagesGenerator = new PlanStagesGenerator();
+ rootOp.accept(stagesGenerator, null);
+ return stagesGenerator.getStages();
+ }
+
+ public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
+ int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int frameSize) {
+ final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
+ groupFrameLimit, joinFrameLimit, frameSize);
+ final IClusterCapacity clusterCapacity = new ClusterCapacity();
+ final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
+ .orElseThrow(IllegalStateException::new);
+ clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory);
+ final Integer maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
+ .orElseThrow(IllegalStateException::new);
+ clusterCapacity.setAggregatedCores(maxRequireCores);
+ return clusterCapacity;
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
new file mode 100644
index 0000000..0e55b1e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.GROUP;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.INNERJOIN;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.LEFTOUTERJOIN;
+import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.ORDER;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.LOCAL;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.PARTITIONED;
+import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import org.apache.asterix.utils.ResourceUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlanStagesGeneratorTest {
+
+ private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS =
+ new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER));
+ private static final long MEMORY_BUDGET = 33554432L;
+ private static final int FRAME_SIZE = 32768;
+ private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
+ private static final int PARALLELISM = 10;
+ private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+ @Test
+ public void noBlockingPlan() throws AlgebricksException {
+ EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+ ets.setExecutionMode(UNPARTITIONED);
+
+ AssignOperator assignOperator = new AssignOperator(Collections.emptyList(), null);
+ assignOperator.setExecutionMode(UNPARTITIONED);
+ assignOperator.getInputs().add(new MutableObject<>(ets));
+
+ ExchangeOperator exchange = new ExchangeOperator();
+ exchange.setExecutionMode(UNPARTITIONED);
+ exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchange.getInputs().add(new MutableObject<>(assignOperator));
+
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ resultOperator.setExecutionMode(UNPARTITIONED);
+ resultOperator.getInputs().add(new MutableObject<>(exchange));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+ List<PlanStage> stages = ResourceUtils.getStages(plan);
+ // ensure a single stage plan
+ final int expectedStages = 1;
+ Assert.assertEquals(expectedStages, stages.size());
+ validateStages(stages, resultOperator, exchange, ets, assignOperator);
+ // frame size for every operator
+ final long expectedMemory = stages.get(0).getOperators().size() * FRAME_SIZE;
+ assertRequiredMemory(stages, expectedMemory);
+ }
+
+ @Test
+ public void testNonBlockingGroupByOrderBy() throws AlgebricksException {
+ EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+ ets.setExecutionMode(PARTITIONED);
+
+ DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
+ scanOperator.setExecutionMode(PARTITIONED);
+ scanOperator.getInputs().add(new MutableObject<>(ets));
+
+ ExchangeOperator exchange = new ExchangeOperator();
+ exchange.setExecutionMode(PARTITIONED);
+ exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ exchange.getInputs().add(new MutableObject<>(scanOperator));
+
+ GroupByOperator groupByOperator = new GroupByOperator();
+ groupByOperator.setExecutionMode(PARTITIONED);
+ groupByOperator
+ .setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT));
+ groupByOperator.getInputs().add(new MutableObject<>(exchange));
+
+ OrderOperator orderOperator = new OrderOperator();
+ orderOperator.setExecutionMode(PARTITIONED);
+ orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
+
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.getInputs().add(new MutableObject<>(orderOperator));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+ final List<PlanStage> stages = ResourceUtils.getStages(plan);
+ validateStages(stages, ets, exchange, groupByOperator, orderOperator, resultOperator);
+ // ensure 3 stage (root to order, order to group by, group by to ets)
+ final int expectedStages = 2;
+ Assert.assertEquals(expectedStages, stages.size());
+
+ // dominating stage should have orderBy, orderBy's input (groupby), groupby's input (exchange),
+ // exchange's input (scanOperator), and scanOperator's input (ets)
+ long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE;
+ long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE;
+ long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
+
+ final long expectedMemory = orderOperatorRequiredMemory + groupByOperatorRequiredMemory + exchangeRequiredMemory
+ + scanOperatorRequiredMemory + etsRequiredMemory;
+ assertRequiredMemory(stages, expectedMemory);
+ }
+
+ @Test
+ public void testJoinGroupby() throws AlgebricksException {
+ EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator();
+ ets1.setExecutionMode(PARTITIONED);
+
+ DataSourceScanOperator scanOperator1 = new DataSourceScanOperator(Collections.emptyList(), null);
+ scanOperator1.setExecutionMode(PARTITIONED);
+ scanOperator1.getInputs().add(new MutableObject<>(ets1));
+
+ EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator();
+ ets1.setExecutionMode(PARTITIONED);
+
+ DataSourceScanOperator scanOperator2 = new DataSourceScanOperator(Collections.emptyList(), null);
+ scanOperator2.setExecutionMode(PARTITIONED);
+ scanOperator2.getInputs().add(new MutableObject<>(ets2));
+
+ InnerJoinOperator firstJoin = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+ firstJoin.setExecutionMode(PARTITIONED);
+ firstJoin.getInputs().add(new MutableObject<>(scanOperator1));
+ firstJoin.getInputs().add(new MutableObject<>(scanOperator2));
+
+ ExchangeOperator exchangeOperator1 = new ExchangeOperator();
+ exchangeOperator1.setExecutionMode(PARTITIONED);
+ exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
+
+ EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
+ ets1.setExecutionMode(PARTITIONED);
+
+ GroupByOperator groupByOperator = new GroupByOperator();
+ groupByOperator
+ .setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT));
+ groupByOperator.setExecutionMode(LOCAL);
+ groupByOperator.getInputs().add(new MutableObject<>(ets3));
+
+ ExchangeOperator exchangeOperator2 = new ExchangeOperator();
+ exchangeOperator2.setExecutionMode(PARTITIONED);
+ exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+ exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator));
+
+ LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+ secondJoin.setExecutionMode(PARTITIONED);
+ secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
+ secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
+
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.getInputs().add(new MutableObject<>(secondJoin));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+ List<PlanStage> stages = ResourceUtils.getStages(plan);
+ final int expectedStages = 4;
+ Assert.assertEquals(expectedStages, stages.size());
+ validateStages(stages, ets1, scanOperator1, ets2, scanOperator2, firstJoin, exchangeOperator1, ets3,
+ groupByOperator, exchangeOperator2, secondJoin, resultOperator);
+
+ // dominating stage should have the following operators:
+ // resultOperator, its input (secondJoin), secondJoin's first input (exchangeOperator1), exchangeOperator1's
+ // input (firstJoin), firstJoin's first input (scanOperator1), and scanOperator1's input (ets1)
+ long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
+ long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * PARALLELISM * PARALLELISM * FRAME_SIZE;
+ long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
+ long ets1RequiredMemory = FRAME_SIZE * PARALLELISM;
+
+ long expectedMemory = resultOperatorRequiredMemory + secondJoinRequiredMemory + exchangeOperator1RequiredMemory
+ + firstJoinRequiredMemory + scanOperator1RequiredMemory + ets1RequiredMemory;
+ assertRequiredMemory(stages, expectedMemory);
+ }
+
+ @Test
+ public void testReplicateSortJoin() throws AlgebricksException {
+ EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+ ets.setExecutionMode(PARTITIONED);
+
+ DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
+ scanOperator.setExecutionMode(PARTITIONED);
+ scanOperator.getInputs().add(new MutableObject<>(ets));
+
+ ReplicateOperator replicateOperator = new ReplicateOperator(2);
+ replicateOperator.setExecutionMode(PARTITIONED);
+ replicateOperator.getInputs().add(new MutableObject<>(scanOperator));
+
+ OrderOperator order1 = new OrderOperator();
+ order1.setExecutionMode(PARTITIONED);
+ order1.setPhysicalOperator(new OneToOneExchangePOperator());
+ order1.getInputs().add(new MutableObject<>(replicateOperator));
+
+ OrderOperator order2 = new OrderOperator();
+ order2.setExecutionMode(PARTITIONED);
+ order2.setPhysicalOperator(new OneToOneExchangePOperator());
+ order2.getInputs().add(new MutableObject<>(replicateOperator));
+
+ LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
+ secondJoin.setExecutionMode(PARTITIONED);
+ secondJoin.getInputs().add(new MutableObject<>(order1));
+ secondJoin.getInputs().add(new MutableObject<>(order2));
+
+ DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
+ resultOperator.setExecutionMode(PARTITIONED);
+ resultOperator.getInputs().add(new MutableObject<>(secondJoin));
+ ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+
+ List<PlanStage> stages = ResourceUtils.getStages(plan);
+ final int expectedStages = 3;
+ Assert.assertEquals(expectedStages, stages.size());
+ validateStages(stages);
+
+ // dominating stage should have the following operators:
+ // secondJoin, secondJoin's second input (order2), order2's input (replicate),
+ // replicate's input (scanOperator), scanOperator's input (ets)
+ long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+ long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
+ long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
+ long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
+ long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
+ + scanOperator1RequiredMemory + etsRequiredMemory;
+ assertRequiredMemory(stages, expectedMemory);
+ }
+
+ private void validateStages(List<PlanStage> stages, ILogicalOperator... operators) {
+ // ensure all operators appear
+ Stream.of(operators).forEach(op -> ensureOperatorExists(stages, op));
+ // ensure the correct count
+ for (PlanStage stage : stages) {
+ stage.getOperators().forEach(op -> validateOperatorStages(stages, op));
+ }
+ }
+
+ private void ensureOperatorExists(List<PlanStage> stages, ILogicalOperator operator) {
+ final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count();
+ Assert.assertTrue(actual > 0);
+ }
+
+ private void validateOperatorStages(List<PlanStage> stages, ILogicalOperator operator) {
+ if (stages.size() == 1) {
+ return;
+ }
+ long expectedAppearances = BLOCKING_OPERATORS.contains(operator.getOperatorTag()) ? 2 : 1;
+ if (operator.getOperatorTag() == GROUP) {
+ GroupByOperator groupByOperator = (GroupByOperator) operator;
+ if (groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY
+ || groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY) {
+ expectedAppearances = 2;
+ }
+ }
+ final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count();
+ Assert.assertEquals(expectedAppearances, actual);
+ }
+
+ private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
+ final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
+ FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+ Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
index 852c392..3bb0f47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -109,4 +109,12 @@
return false;
}
+ public boolean isMaterialized(ILogicalOperator op) {
+ for (int i = 0; i < outputs.size(); i++) {
+ if (outputs.get(i).getValue() == op) {
+ return outputMaterializationFlags[i];
+ }
+ }
+ return false;
+ }
}