[ASTERIXDB-3466][COMP] Make operators minimum frames budget configurable
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
For operators that require a (minimum) budget like sort and join,
make the minimum required frames configurable. Default is 512 KB.
Ext-ref: MB-62818
Change-Id: I9c01815bc3b0498486728898a41dde4da271d041
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18506
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 2d5b57d..d83f1ea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -55,6 +55,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -100,6 +101,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH);
+ }
+
+ @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index c09cb34..574dc8a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -67,6 +67,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.junit.Assert;
import org.junit.Test;
@@ -78,6 +79,7 @@
private static final int FRAME_SIZE = 32768;
private static final int PARALLELISM = 10;
private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+ private static final PhysicalOptimizationConfig physicalConfig = new PhysicalOptimizationConfig();
@Test
public void noBlockingPlan() throws AlgebricksException {
@@ -336,7 +338,7 @@
private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
for (PlanStage stage : stages) {
for (ILogicalOperator op : stage.getOperators()) {
- ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op);
+ ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op, physicalConfig);
}
}
final IClusterCapacity clusterCapacity =
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index ad6bfe4..a1736da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -41,7 +41,11 @@
"compiler\.indexonly" : true,
"compiler\.internal\.sanitycheck" : true,
"compiler\.joinmemory" : 262144,
+ "compiler.min.groupmemory" : 524288,
+ "compiler.min.joinmemory" : 524288,
"compiler\.min\.memory\.allocation" : true,
+ "compiler.min.sortmemory" : 524288,
+ "compiler.min.windowmemory" : 524288,
"compiler\.parallelism" : 0,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 434c135..6f3bc38 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -41,7 +41,11 @@
"compiler\.indexonly" : true,
"compiler\.internal\.sanitycheck" : false,
"compiler\.joinmemory" : 262144,
+ "compiler.min.groupmemory" : 524288,
+ "compiler.min.joinmemory" : 524288,
"compiler\.min\.memory\.allocation" : true,
+ "compiler.min.sortmemory" : 524288,
+ "compiler.min.windowmemory" : 524288,
"compiler\.parallelism" : -1,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 296ac47..415bdba 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -41,7 +41,11 @@
"compiler\.indexonly" : true,
"compiler\.internal\.sanitycheck" : false,
"compiler\.joinmemory" : 262144,
+ "compiler.min.groupmemory" : 524288,
+ "compiler.min.joinmemory" : 524288,
"compiler\.min\.memory\.allocation" : true,
+ "compiler.min.sortmemory" : 524288,
+ "compiler.min.windowmemory" : 524288,
"compiler\.parallelism" : 3,
"compiler.queryplanshape" : "zigzag",
"compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index cfe7ce8..8ca5c94 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.common.config;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_JOIN;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_WINDOW;
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
@@ -66,6 +70,22 @@
INTEGER_BYTE_UNIT,
StorageUtil.getIntSizeInBytes(32, KILOBYTE),
"The page size (in bytes) for computation"),
+ COMPILER_MIN_SORTMEMORY(
+ LONG_BYTE_UNIT,
+ StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+ "The min memory budget (in bytes) for a sort operator instance in a partition"),
+ COMPILER_MIN_JOINMEMORY(
+ LONG_BYTE_UNIT,
+ StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+ "The min memory budget (in bytes) for a join operator instance in a partition"),
+ COMPILER_MIN_GROUPMEMORY(
+ LONG_BYTE_UNIT,
+ StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+ "The min memory budget (in bytes) for a group by operator instance in a partition"),
+ COMPILER_MIN_WINDOWMEMORY(
+ LONG_BYTE_UNIT,
+ StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+ "The min memory budget (in bytes) for a window operator instance in a partition"),
COMPILER_PARALLELISM(
INTEGER,
COMPILER_PARALLELISM_AS_STORAGE,
@@ -240,6 +260,26 @@
return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY);
}
+ public int getMinSortMemoryFrames() {
+ int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_SORTMEMORY) / getFrameSize();
+ return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT);
+ }
+
+ public int getMinJoinMemoryFrames() {
+ int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_JOINMEMORY) / getFrameSize();
+ return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_JOIN);
+ }
+
+ public int getMinGroupMemoryFrames() {
+ int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_GROUPMEMORY) / getFrameSize();
+ return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ }
+
+ public int getMinWindowMemoryFrames() {
+ int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_WINDOWMEMORY) / getFrameSize();
+ return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_WINDOW);
+ }
+
public int getFrameSize() {
return accessor.getInt(Option.COMPILER_FRAMESIZE);
}
@@ -315,7 +355,7 @@
public int getSortMemoryFrames() {
int numFrames = (int) getSortMemorySize() / getFrameSize();
- return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT);
+ return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT);
}
public boolean isColumnFilter() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index c83b86e..160c04d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -40,9 +40,9 @@
public class OptimizationConfUtil {
public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
- private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
- private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
- private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+ public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+ public static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+ public static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see InvertedIndexPOperator
private OptimizationConfUtil() {
@@ -119,6 +119,10 @@
physOptConf.setForceJoinOrderMode(forceJoinOrder);
physOptConf.setQueryPlanShapeMode(queryPlanShape);
physOptConf.setColumnFilter(columnFilter);
+ physOptConf.setMinSortFrames(compilerProperties.getMinSortMemoryFrames());
+ physOptConf.setMinJoinFrames(compilerProperties.getMinJoinMemoryFrames());
+ physOptConf.setMinGroupFrames(compilerProperties.getMinGroupMemoryFrames());
+ physOptConf.setMinWindowFrames(compilerProperties.getMinWindowMemoryFrames());
// We should have already validated the parameter names at this point...
Set<String> filteredParameterNames = new HashSet<>(parameterNames);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index b0624e8..d1b1815 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
public interface IPhysicalOperator {
@@ -56,6 +57,8 @@
public void createLocalMemoryRequirements(ILogicalOperator op);
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig);
+
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
index 99d87c3..bb8c488 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.exceptions.ErrorCode;
public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
@@ -86,6 +87,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinGroupFrames());
+ }
+
+ @Override
public String toString() {
return getOperatorTag().toString() + columnList;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index c300392..bf2b612 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
@@ -65,4 +66,9 @@
public void createLocalMemoryRequirements(ILogicalOperator op) {
localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN);
}
+
+ @Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinJoinFrames());
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 4bc7502..298ad38 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -105,6 +106,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1);
+ }
+
+ @Override
public void disableJobGenBelowMe() {
this.disableJobGenBelow = true;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index f941bdb..ff45322 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
@@ -187,4 +188,9 @@
public void createLocalMemoryRequirements(ILogicalOperator op) {
localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT);
}
+
+ @Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinSortFrames());
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 0e9191f..9e25d85 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -69,6 +70,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinWindowFrames());
+ }
+
+ @Override
protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
index 78983b9..ead5f56 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -55,6 +56,11 @@
}
@Override
+ public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+ localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM);
+ }
+
+ @Override
protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index d167153..11171a1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -18,6 +18,11 @@
*/
package org.apache.hyracks.algebricks.core.rewriter.base;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+
import java.util.Properties;
import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
@@ -53,6 +58,10 @@
private static final String FORCE_JOIN_ORDER = "FORCE_JOIN_ORDER";
private static final String QUERY_PLAN_SHAPE = "QUERY_PLAN_SHAPE";
private static final String COLUMN_FILTER = "COLUMN_FILTER";
+ private static final String MIN_SORT_FRAMES = "MIN_SORT_FRAMES";
+ private static final String MIN_JOIN_FRAMES = "MIN_JOIN_FRAMES";
+ private static final String MIN_GROUP_FRAMES = "MIN_GROUP_FRAMES";
+ private static final String MIN_WINDOW_FRAMES = "MIN_WINDOW_FRAMES";
private final Properties properties = new Properties();
@@ -66,6 +75,11 @@
setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+
+ setInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT);
+ setInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN);
+ setInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ setInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW);
}
public int getFrameSize() {
@@ -169,6 +183,54 @@
setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
}
+ public int getMinSortFrames() {
+ return getInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT);
+ }
+
+ public void setMinSortFrames(int minSortFrames) {
+ if (minSortFrames < MIN_FRAME_LIMIT_FOR_SORT) {
+ throw new IllegalArgumentException(
+ "Minimum sort frames is " + MIN_FRAME_LIMIT_FOR_SORT + ", got " + minSortFrames);
+ }
+ setInt(MIN_SORT_FRAMES, minSortFrames);
+ }
+
+ public int getMinJoinFrames() {
+ return getInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN);
+ }
+
+ public void setMinJoinFrames(int minJoinFrames) {
+ if (minJoinFrames < MIN_FRAME_LIMIT_FOR_JOIN) {
+ throw new IllegalArgumentException(
+ "Minimum join frames is " + MIN_FRAME_LIMIT_FOR_JOIN + ", got " + minJoinFrames);
+ }
+ setInt(MIN_JOIN_FRAMES, minJoinFrames);
+ }
+
+ public int getMinGroupFrames() {
+ return getInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ }
+
+ public void setMinGroupFrames(int minGroupFrames) {
+ if (minGroupFrames < MIN_FRAME_LIMIT_FOR_GROUP_BY) {
+ throw new IllegalArgumentException(
+ "Minimum group frames is " + MIN_FRAME_LIMIT_FOR_GROUP_BY + ", got " + minGroupFrames);
+ }
+ setInt(MIN_GROUP_FRAMES, minGroupFrames);
+ }
+
+ public int getMinWindowFrames() {
+ return getInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW);
+ }
+
+ public void setMinWindowFrames(int minWindowFrames) {
+ if (minWindowFrames < MIN_FRAME_LIMIT_FOR_WINDOW) {
+ throw new IllegalArgumentException(
+ "Minimum window frames is " + MIN_FRAME_LIMIT_FOR_WINDOW + ", got " + minWindowFrames);
+ }
+ setInt(MIN_WINDOW_FRAMES, minWindowFrames);
+ }
+
public boolean getSortParallel() {
return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL_DEFAULT);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
index 4e859e4..0b87eae 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -75,7 +75,7 @@
/**
* Set memory requirements for all operators as follows:
* <ol>
- * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)}
+ * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator, PhysicalOptimizationConfig)}
* to initialize each operator's {@link LocalMemoryRequirements} with minimal memory budget required by
* that operator</li>
* <li>Then increase memory requirements for certain operators as specified by {@link PhysicalOptimizationConfig}</li>
@@ -97,19 +97,23 @@
if (physOp.getLocalMemoryRequirements() != null) {
return false;
}
- computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context));
+ computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context), context);
return true;
}
private void computeLocalMemoryRequirements(AbstractLogicalOperator op,
- ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor, IOptimizationContext context)
+ throws AlgebricksException {
IPhysicalOperator physOp = op.getPhysicalOperator();
if (physOp.getLocalMemoryRequirements() == null) {
- physOp.createLocalMemoryRequirements(op);
- if (physOp.getLocalMemoryRequirements() == null) {
- throw new IllegalStateException(physOp.getOperatorTag().toString());
- }
- if (memoryRequirementsVisitor != null) {
+ if (memoryRequirementsVisitor == null) {
+ // null means forcing the min memory budget from the physical optimization config
+ physOp.createLocalMemoryRequirements(op, context.getPhysicalOptimizationConfig());
+ } else {
+ physOp.createLocalMemoryRequirements(op);
+ if (physOp.getLocalMemoryRequirements() == null) {
+ throw new IllegalStateException(physOp.getOperatorTag().toString());
+ }
op.accept(memoryRequirementsVisitor, null);
}
}
@@ -117,13 +121,14 @@
AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
for (Mutable<ILogicalOperator> root : p.getRoots()) {
- computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(),
- memoryRequirementsVisitor);
+ computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), memoryRequirementsVisitor,
+ context);
}
}
}
for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor);
+ computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor,
+ context);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 766fb26..59a4da4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -157,6 +157,7 @@
INVALID_STRING_UNICODE(127),
UNSUPPORTED_WRITE_SPEC(128),
JOB_REJECTED(129),
+ FRAME_BIGGER_THAN_SORT_MEMORY(130),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index fa52bc6..7da6bbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -147,6 +147,7 @@
127 = Decoding error - %1$s
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
129 = Job %1$s not run. Cluster is not accepting jobs
+130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 20924d5..d4dfae1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -150,9 +151,8 @@
return true;
}
if (getFrameCount() == 0) {
- throw new HyracksDataException("The required memory=" + requiredMemory + " for the frame data="
- + inputTupleAccessor.getBuffer().capacity() + " is too big for the sorting buffer. Used="
- + totalMemoryUsed + ", max=" + maxSortMemory + ", please allocate bigger buffer size");
+ throw HyracksDataException.create(ErrorCode.FRAME_BIGGER_THAN_SORT_MEMORY,
+ inputTupleAccessor.getBuffer().capacity(), requiredMemory, totalMemoryUsed, maxSortMemory);
}
return false;
}