[ASTERIXDB-1812][RT] Budget the memory usage for pre-clustered group-by.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- let pre-clustered group-by consider memory budget.
Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1940
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 5aaf87b..d22ec54 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -162,8 +162,8 @@
}
ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
gby.getGroupByList(),
- physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
* physicalOptimizationConfig.getFrameSize());
generateMergeAggregationExpressions(gby, context);
op.setPhysicalOperator(externalGby);
@@ -182,7 +182,8 @@
columnList.add(varRef.getVariableReference());
}
}
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
}
}
} else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag()
@@ -196,7 +197,8 @@
columnList.add(varRef.getVariableReference());
}
}
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
} else {
throw new AlgebricksException("Unsupported nested operator within a group-by: "
+ ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name());
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
index 493cbb0..4fa4f3e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql
@@ -25,6 +25,8 @@
use dataverse test;
+set "compiler.groupmemory" "32MB"
+
for $i in dataset('Line')
group by $partkey := $i.l_partkey with $i
order by $partkey
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
index b5388a9..5a5009b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp
@@ -25,6 +25,7 @@
use test;
+set `compiler.groupmemory` `32MB`
select element {'partkey':partkey,'lines': (from g select value i) }
from Line as i
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp
new file mode 100644
index 0000000..7379646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `compiler.groupmemory` "576KB"
+
+SELECT *
+FROM LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+ORDER BY l_returnflag, l_linestatus
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
new file mode 100644
index 0000000..ba1e7d4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SET `compiler.groupmemory` "576KB"
+
+SELECT l_returnflag AS l_returnflag,
+ l_linestatus AS l_linestatus,
+ g
+FROM LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+GROUP AS g
+ORDER BY l_returnflag, l_linestatus
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
index 1b4831f..9747f1e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast
@@ -1,4 +1,5 @@
DataverseUse test
+Set compiler.groupmemory=32MB
Query:
SELECT ELEMENT [
RecordConstructor [
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 70b3486..16d4cb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2887,6 +2887,18 @@
<expected-error>Invalid item type: function agg-sum cannot process item type object in an input array (or multiset)</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="group-by">
+ <compilation-unit name="listify">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>The byte size of a single group</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="group-by">
+ <compilation-unit name="listify-2">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>The byte size of a single group</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index-join">
<test-case FilePath="index-join">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 5a465f7..78e4795 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -44,10 +44,12 @@
public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
private final boolean groupAll;
+ private final int framesLimit;
- public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll) {
+ public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
super(columnList);
this.groupAll = groupAll;
+ this.framesLimit = framesLimit;
}
@Override
@@ -86,7 +88,7 @@
context);
PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
- comparatorFactories, aggregatorFactory, recordDescriptor, groupAll);
+ comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 7340882..a242927 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -91,7 +91,7 @@
setInt(MAX_FRAMES_FOR_JOIN, frameLimit);
}
- public int getMaxFramesExternalGroupBy() {
+ public int getMaxFramesForGroupBy() {
int frameSize = getFrameSize();
return getInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 576bd62..7ea1327 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -102,7 +102,7 @@
//replace preclustered gby with sort gby
if (!groupByOperator.isGroupAll()) {
op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(),
- context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
sortPhysicalOperator.getSortColumns()));
}
// remove the stable sort operator
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7dc59f6..16ed9cb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -167,8 +167,8 @@
if (hasIntermediateAgg) {
ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
gby.getGroupByList(),
- physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
* physicalOptimizationConfig.getFrameSize());
op.setPhysicalOperator(externalGby);
break;
@@ -187,7 +187,8 @@
}
}
if (topLevelOp) {
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
} else {
op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 893aa61..94af04f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -53,7 +54,8 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget)
+ throws HyracksDataException {
final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
@@ -86,6 +88,9 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // Checks the memory usage.
+ memoryUsageCheck();
+
for (int i = 0; i < pipelines.length; i++) {
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
@@ -98,7 +103,10 @@
outputWriter.setInputIdx(i);
pipelines[i].close();
}
- // outputWriter.writeTuple(appender);
+
+ // Checks the memory usage.
+ memoryUsageCheck();
+
tupleBuilder.reset();
ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
byte[] data = tb.getByteArray();
@@ -136,6 +144,18 @@
}
+ // Checks the memory usage.
+ private void memoryUsageCheck() throws HyracksDataException {
+ if (memoryBudget > 0) {
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ byte[] data = tb.getByteArray();
+ if (data.length > memoryBudget) {
+ throw HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, data.length,
+ memoryBudget);
+ }
+ }
+ }
+
};
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 8b8e320..c261df8 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -60,7 +60,7 @@
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
- final IFrameWriter writer) throws HyracksDataException {
+ final IFrameWriter writer, long memoryBudget) throws HyracksDataException {
final RunningAggregatorOutput outputWriter =
new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
// should enforce protocol
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index e4ac701..1e06c76 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -42,8 +42,8 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+ long memoryBudget) throws HyracksDataException {
final int[] keys = keyFields;
/**
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 87f3cb4..50452c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -43,7 +43,8 @@
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys, long memoryBudget)
+ throws HyracksDataException {
return new IAggregatorDescriptor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e6fbc6f..b054faf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -122,6 +122,8 @@
public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
+ public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
+ public static final int ILLEGAL_MEMORY_BUDGET = 90;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index d2e05e3..1d2143b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -105,5 +105,7 @@
86 = Found an unrecognized index file %1$s
87 = Unequal number of trees and filters found in %1$s
88 = Cannot modify index (Disk is full)
+89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
+90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
index 5d13332..d546e5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -32,14 +32,14 @@
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
- throws HyracksDataException {
- return this
- .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer,
+ long memoryBudget) throws HyracksDataException {
+ return createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults,
+ memoryBudget);
}
abstract protected IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
- final int[] keyFieldsInPartialResults) throws HyracksDataException;
+ final int[] keyFieldsInPartialResults, long memoryBudget) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index cd79a30..43b9685 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -97,7 +97,7 @@
hashFunctionFamilies).createPartitioner(seed);
final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, intermediateResultKeys, null);
+ outRecordDescriptor, keyFields, intermediateResultKeys, null, -1);
final AggregateState aggregateState = aggregator.createAggregateStates();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 99c787c..d5ed713 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -29,6 +29,6 @@
IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
- IFrameWriter writer) throws HyracksDataException;
+ IFrameWriter writer, long memoryBudget) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index e326f39..595e2c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -28,7 +28,6 @@
import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.AggregateState;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -58,8 +57,8 @@
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults,
+ long memoryBudget) throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 0fe0f54..739de74 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -35,22 +35,30 @@
private final IBinaryComparatorFactory[] comparatorFactories;
private final IAggregatorDescriptorFactory aggregatorFactory;
private final boolean groupAll;
+ private final int framesLimit;
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor recordDescriptor) {
- this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false);
+ this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
}
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, boolean groupAll) {
+ RecordDescriptor recordDescriptor, int framesLimit) {
+ this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, framesLimit);
+ }
+
+ public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor, boolean groupAll, int framesLimit) {
super(spec, 1, 1);
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
this.aggregatorFactory = aggregatorFactory;
outRecDescs[0] = recordDescriptor;
this.groupAll = groupAll;
+ this.framesLimit = framesLimit;
}
@Override
@@ -58,6 +66,6 @@
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll);
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll, framesLimit);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..0a7444d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -36,12 +36,14 @@
private final RecordDescriptor inRecordDescriptor;
private final RecordDescriptor outRecordDescriptor;
private final boolean groupAll;
+ private final int frameLimit;
private PreclusteredGroupWriter pgw;
PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll) {
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll,
+ int frameLimit) {
this.ctx = ctx;
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
@@ -49,6 +51,7 @@
this.inRecordDescriptor = inRecordDescriptor;
this.outRecordDescriptor = outRecordDescriptor;
this.groupAll = groupAll;
+ this.frameLimit = frameLimit;
}
@Override
@@ -58,7 +61,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
- outRecordDescriptor, writer, false, groupAll);
+ outRecordDescriptor, writer, false, groupAll, frameLimit);
pgw.open();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 3105d42..db6102e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -51,28 +52,39 @@
private final boolean outputPartial;
private boolean first;
private boolean isFailed = false;
+ private final long memoryLimit;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
- this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false);
+ this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false, -1);
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, outputPartial,
- false);
+ false, -1);
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll)
- throws HyracksDataException {
+ RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll,
+ int framesLimit) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
+
+ if (framesLimit >= 0 && framesLimit <= 2) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, "GROUP BY",
+ Long.toString(((long) (framesLimit)) * ctx.getInitialFrameSize()),
+ Long.toString(2L * ctx.getInitialFrameSize()));
+ }
+
+ // Deducts input/output frames.
+ this.memoryLimit = framesLimit <= 0 ? -1 : ((long) (framesLimit - 2)) * ctx.getInitialFrameSize();
this.aggregator =
- aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
+ aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer,
+ this.memoryLimit);
this.aggregateState = aggregator.createAggregateStates();
copyFrame = new VSizeFrame(ctx);
inFrameAccessor = new FrameTupleAccessor(inRecordDesc);