[ASTERIXDB-2796][RT] Fix IndexOutOfBoundsException in hash group-by
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
IndexOutOfBoundsException happens in hash group-by when WITH
clause exists. The variables from the WITH clause get propagated
through the decor variables of the group-by. The number of output
keys are the sum of both the group-by keys and decor variables, but
the hash functions & comparators used are only for the group-by keys.
- pass only the aggregated keys to the comparators and hash functions.
Change-Id: I19dc652872a1f030b6afa509711e7e0700e86856
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8743
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
new file mode 100644
index 0000000..7d2c396
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Testing that runtime of hash-group by handles decor variables
+ * Expected : SUCCESS
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {id: int};
+CREATE DATASET ds1(t1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
new file mode 100644
index 0000000..e79903f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ds1 {"id": 1, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 2, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 3, "dept_id": 1, "salary": 10, "area": "A"};
+INSERT INTO ds1 {"id": 4, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 5, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 6, "dept_id": 2, "salary": 20, "area": "A"};
+INSERT INTO ds1 {"id": 7, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 8, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 9, "dept_id": 3, "salary": 30, "area": "A"};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
new file mode 100644
index 0000000..39cd32f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+WITH a AS "N"
+FROM ds1 v
+WHERE v.area = a
+/*+ hash */
+GROUP BY v.dept_id
+SELECT v.dept_id, SUM(v.salary) AS total, a
+ORDER BY v.dept_id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
new file mode 100644
index 0000000..c79ed9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
@@ -0,0 +1,3 @@
+{ "a": "N", "dept_id": 1, "total": 20 }
+{ "a": "N", "dept_id": 2, "total": 40 }
+{ "a": "N", "dept_id": 3, "total": 60 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6d56e70..adbdf69 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5808,6 +5808,11 @@
<output-dir compare="Text">group-by-all-ASTERIXDB-2611</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="group-by">
+ <compilation-unit name="hash-group-by-decor">
+ <output-dir compare="Text">hash-group-by-decor</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index-join">
<test-case FilePath="index-join">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 945e36c..7515258 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -123,8 +123,8 @@
GroupByOperator gby = (GroupByOperator) op;
checkGroupAll(gby);
List<LogicalVariable> gbyCols = getGroupByColumns();
- int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
- int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
+ int[] gbyColumns = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
+ int[] fdColumns = getFdColumns(gby, inputSchemas[0]);
if (gby.getNestedPlans().size() != 1) {
throw new AlgebricksException(
@@ -161,14 +161,6 @@
.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv, context.getMetadataProvider()));
}
- int[] keyAndDecFields = new int[keys.length + fdColumns.length];
- for (i = 0; i < keys.length; ++i) {
- keyAndDecFields[i] = keys[i];
- }
- for (i = 0; i < fdColumns.length; i++) {
- keyAndDecFields[keys.length + i] = fdColumns[i];
- }
-
List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
keyAndDecVariables.add(p.first);
@@ -225,15 +217,14 @@
// Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
int frameSize = context.getFrameSize();
long memoryBudgetInBytes = localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
- int numFds = gby.getDecorList().size();
- int groupByColumnsCount = gby.getGroupByList().size() + numFds;
+ int allColumns = gbyColumns.length + fdColumns.length;
int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
- groupByColumnsCount, frameSize);
+ allColumns, frameSize);
int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
long inputSize = framesLimit * (long) frameSize;
ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
- keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
+ gbyColumns, fdColumns, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
gbyOpDesc.setSourceLocation(gby.getSourceLocation());
contributeOpDesc(builder, gby, gbyOpDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 4287b26..0261106 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -146,7 +146,7 @@
try {
context.getPlanStabilityVerifier().recordPlanSignature(opRef);
} catch (AlgebricksException e) {
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Illegal state before rule %s. %s", rule.getClass().getName(), e.getMessage()));
}
}
@@ -157,7 +157,7 @@
try {
context.getPlanStructureVerifier().verifyPlanStructure(opRef);
} catch (AlgebricksException e) {
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Fired rule %s produced illegal %s", rule.getClass().getName(), e.getMessage()));
}
} else {
@@ -168,7 +168,7 @@
printRuleApplication(rule, "not fired, but failed sanity check: " + e.getMessage(), beforePlan,
getPlanString(opRef));
}
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Non-fired rule %s unexpectedly %s", rule.getClass().getName(), e.getMessage()));
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index ab06488..f93d1f9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -128,7 +128,7 @@
context.getPlanStructureVerifier().verifyPlanStructure(opRef);
}
} catch (AlgebricksException e) {
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Initial plan contains illegal %s", e.getMessage()));
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
index 3c468be..32dca00 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
@@ -126,12 +126,12 @@
for (LogicalVariable opProducedVar : opProducedVars) {
LogicalVariable leftBranchProducedVar = leftBranchProducedVarMap.get(opProducedVar);
if (leftBranchProducedVar == null) {
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE, op.getSourceLocation(),
"Cannot find " + opProducedVar);
}
LogicalVariable rightBranchProducedVar = rightBranchProducedVarMap.get(opProducedVar);
if (rightBranchProducedVar == null) {
- throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE, op.getSourceLocation(),
"Cannot find " + opProducedVar);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a93da48..db60f1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -159,6 +159,7 @@
public static final int INSUFFICIENT_MEMORY = 123;
public static final int PARSING_ERROR = 124;
public static final int INVALID_INVERTED_LIST_TYPE_TRAITS = 125;
+ public static final int ILLEGAL_STATE = 126;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
@@ -170,7 +171,6 @@
public static final int INAPPLICABLE_HINT = 10006;
public static final int CROSS_PRODUCT_JOIN = 10007;
public static final int GROUP_ALL_DECOR = 10008;
- public static final int COMPILATION_ILLEGAL_STATE = 10009;
private static class Holder {
private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 01895a6..c01f39a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,6 +143,7 @@
123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
124 = Parsing error at %1$s line %2$s field %3$s: %4$s
125 = Invalid inverted list type traits: %1$s
+126 = Illegal state. %1$s
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
@@ -152,5 +153,4 @@
10005 = Operator is not implemented: %1$s
10006 = Could not apply %1$s hint: %2$s
10007 = Encountered a cross product join
-10008 = Inappropriate use of group by all with decor variables
-10009 = Illegal state. %1$s
\ No newline at end of file
+10008 = Inappropriate use of group by all with decor variables
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 3232527..4f0c304 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -66,7 +66,7 @@
@Override
public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize,
- long inputDataBytesSize, final int[] keyFields, final IBinaryComparator[] comparators,
+ long inputDataBytesSize, final int[] gbyFields, final int[] fdFields, final IBinaryComparator[] comparators,
final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
final int seed) throws HyracksDataException {
@@ -79,26 +79,47 @@
throw new HyracksDataException("The given frame limit is too small to partition the data.");
}
- final int[] intermediateResultKeys = new int[keyFields.length];
- for (int i = 0; i < keyFields.length; i++) {
- intermediateResultKeys[i] = i;
+ final int[] intermediateResultGbyFields = new int[gbyFields.length];
+ for (int i = 0; i < gbyFields.length; i++) {
+ intermediateResultGbyFields[i] = i;
+ }
+
+ final int[] allFields;
+ final int[] intermediateResultAllFields;
+ if (fdFields == null) {
+ // no need to combine gby and fd
+ allFields = gbyFields;
+ intermediateResultAllFields = intermediateResultGbyFields;
+ } else {
+ allFields = new int[gbyFields.length + fdFields.length];
+ intermediateResultAllFields = new int[gbyFields.length + fdFields.length];
+ int k = 0;
+ int position = 0;
+ for (int i = 0; i < gbyFields.length; i++, position++, k++) {
+ allFields[k] = gbyFields[i];
+ intermediateResultAllFields[k] = position;
+ }
+ for (int i = 0; i < fdFields.length; i++, position++, k++) {
+ allFields[k] = fdFields[i];
+ intermediateResultAllFields[k] = position;
+ }
}
final FrameTuplePairComparator ftpcInputCompareToAggregate =
- new FrameTuplePairComparator(keyFields, intermediateResultKeys, comparators);
+ new FrameTuplePairComparator(gbyFields, intermediateResultGbyFields, comparators);
final ITuplePartitionComputer tpc =
- new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies).createPartitioner(seed);
+ new FieldHashPartitionComputerFamily(gbyFields, hashFunctionFamilies).createPartitioner(seed);
// For calculating hash value for the already aggregated tuples (not incoming tuples)
// This computer is required to calculate the hash value of a aggregated tuple
// while doing the garbage collection work on Hash Table.
final ITuplePartitionComputer tpcIntermediate =
- new FieldHashPartitionComputerFamily(intermediateResultKeys, hashFunctionFamilies)
+ new FieldHashPartitionComputerFamily(intermediateResultGbyFields, hashFunctionFamilies)
.createPartitioner(seed);
final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, intermediateResultKeys, null, -1);
+ outRecordDescriptor, allFields, intermediateResultAllFields, null, -1);
final AggregateState aggregateState = aggregator.createAggregateStates();
@@ -225,8 +246,8 @@
private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
stateTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
+ for (int k = 0; k < allFields.length; k++) {
+ stateTupleBuilder.addField(accessor, tIndex, allFields[k]);
}
aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
}
@@ -246,10 +267,10 @@
hashTableForTuplePointer.getTuplePointer(hashEntryPid, tid, pointer);
bufferAccessor.reset(pointer);
outputTupleBuilder.reset();
- for (int k = 0; k < intermediateResultKeys.length; k++) {
+ for (int k = 0; k < intermediateResultAllFields.length; k++) {
outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
- bufferAccessor.getAbsFieldStartOffset(intermediateResultKeys[k]),
- bufferAccessor.getFieldLength(intermediateResultKeys[k]));
+ bufferAccessor.getAbsFieldStartOffset(intermediateResultAllFields[k]),
+ bufferAccessor.getFieldLength(intermediateResultAllFields[k]));
}
boolean hasOutput = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 629f211..c60b29a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -27,9 +27,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface ISpillableTableFactory extends Serializable {
+
ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize,
- int[] keyFields, IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
- IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int framesLimit, int seed) throws HyracksDataException;
+ int[] gbyFields, int[] fdFields, IBinaryComparator[] comparatorFactories,
+ INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int framesLimit, int seed)
+ throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 20d223e..02cee04 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
@@ -47,7 +48,8 @@
private static final Logger LOGGER = LogManager.getLogger();
private final IHyracksTaskContext ctx;
private final Object stateId;
- private final int[] keyFields;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer firstNormalizerComputer;
private final IAggregatorDescriptorFactory aggregatorFactory;
@@ -63,15 +65,19 @@
private boolean isFailed = false;
public ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int tableSize, long fileSize,
- int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+ int[] gbyFields, int[] fdFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor,
- ISpillableTableFactory spillableTableFactory) {
+ ISpillableTableFactory spillableTableFactory) throws HyracksDataException {
+ if (comparatorFactories.length != gbyFields.length) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE, "mismatch in group by fields and comparators");
+ }
this.ctx = ctx;
this.stateId = stateId;
this.framesLimit = framesLimit;
this.aggregatorFactory = aggregatorFactory;
- this.keyFields = keyFields;
+ this.gbyFields = gbyFields;
+ this.fdFields = fdFields;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,7 +94,7 @@
@Override
public void open() throws HyracksDataException {
state = new ExternalGroupState(ctx.getJobletContext().getJobId(), stateId);
- ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, tableSize, fileSize, keyFields,
+ ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, tableSize, fileSize, gbyFields, fdFields,
comparators, firstNormalizerComputer, aggregatorFactory, inRecordDescriptor, outRecordDescriptor,
framesLimit, INIT_SEED);
RunFileWriter[] runFileWriters = new RunFileWriter[table.getNumPartitions()];
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 6dea186..7e23ac3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -44,11 +44,11 @@
private static final int MERGE_ACTIVITY_ID = 1;
- private static final long serialVersionUID = 1L;
- private final int[] keyFields;
+ private static final long serialVersionUID = 2L;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
private final IAggregatorDescriptorFactory partialAggregatorFactory;
private final IAggregatorDescriptorFactory intermediateAggregateFactory;
@@ -60,7 +60,7 @@
private final long fileSize;
public ExternalGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputSizeInTuple, long inputFileSize,
- int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+ int[] gbyFields, int[] fdFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory partialAggregatorFactory,
IAggregatorDescriptorFactory intermediateAggregateFactory, RecordDescriptor partialAggRecordDesc,
RecordDescriptor outRecordDesc, ISpillableTableFactory spillableTableFactory) {
@@ -76,7 +76,8 @@
}
this.partialAggregatorFactory = partialAggregatorFactory;
this.intermediateAggregateFactory = intermediateAggregateFactory;
- this.keyFields = keyFields;
+ this.gbyFields = gbyFields;
+ this.fdFields = fdFields;
this.comparatorFactories = comparatorFactories;
this.firstNormalizerFactory = firstNormalizerFactory;
this.spillableTableFactory = spillableTableFactory;
@@ -93,13 +94,6 @@
this.fileSize = inputFileSize;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
- * (org.apache.hyracks.api.dataflow.IActivityGraphBuilder)
- */
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
@@ -126,7 +120,7 @@
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), tableSize,
- fileSize, keyFields, framesLimit, comparatorFactories, firstNormalizerFactory,
+ fileSize, gbyFields, fdFields, framesLimit, comparatorFactories, firstNormalizerFactory,
partialAggregatorFactory, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
outRecDescs[0], spillableTableFactory);
}
@@ -145,8 +139,8 @@
throws HyracksDataException {
return new ExternalGroupWriteOperatorNodePushable(ctx,
new TaskId(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition),
- spillableTableFactory, partialRecDesc, outRecDesc, framesLimit, keyFields, firstNormalizerFactory,
- comparatorFactories, intermediateAggregateFactory);
+ spillableTableFactory, partialRecDesc, outRecDesc, framesLimit, gbyFields, fdFields,
+ firstNormalizerFactory, comparatorFactories, intermediateAggregateFactory);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index e8a1b76..1a6f4ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -42,6 +43,7 @@
public class ExternalGroupWriteOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
implements IRunFileWriterGenerator {
+
private static final Logger LOGGER = LogManager.getLogger();
private final IHyracksTaskContext ctx;
private final Object stateId;
@@ -49,7 +51,8 @@
private final RecordDescriptor partialAggRecordDesc;
private final RecordDescriptor outRecordDesc;
private final IAggregatorDescriptorFactory mergeAggregatorFactory;
- private final int[] mergeGroupFields;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparator[] groupByComparators;
private final int frameLimit;
private final INormalizedKeyComputer nmkComputer;
@@ -57,29 +60,38 @@
public ExternalGroupWriteOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
ISpillableTableFactory spillableTableFactory, RecordDescriptor partialAggRecordDesc,
- RecordDescriptor outRecordDesc, int framesLimit, int[] groupFields,
+ RecordDescriptor outRecordDesc, int framesLimit, int[] gbyFields, int[] fdFields,
INormalizedKeyComputerFactory nmkFactory, IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory) {
+ IAggregatorDescriptorFactory aggregatorFactory) throws HyracksDataException {
+ if (comparatorFactories.length != gbyFields.length) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE, "mismatch in group by fields and comparators");
+ }
this.ctx = ctx;
this.stateId = stateId;
this.spillableTableFactory = spillableTableFactory;
this.frameLimit = framesLimit;
this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
-
this.partialAggRecordDesc = partialAggRecordDesc;
this.outRecordDesc = outRecordDesc;
-
this.mergeAggregatorFactory = aggregatorFactory;
//create merge group fields
- int numGroupFields = groupFields.length;
- mergeGroupFields = new int[numGroupFields];
- for (int i = 0; i < numGroupFields; i++) {
- mergeGroupFields[i] = i;
+ this.gbyFields = new int[gbyFields.length];
+ int position = 0;
+ for (int i = 0; i < this.gbyFields.length; i++, position++) {
+ this.gbyFields[i] = position;
+ }
+ if (fdFields != null) {
+ this.fdFields = new int[fdFields.length];
+ for (int i = 0; i < this.fdFields.length; i++, position++) {
+ this.fdFields[i] = position;
+ }
+ } else {
+ this.fdFields = null;
}
//setup comparators for grouping
- groupByComparators = new IBinaryComparator[Math.min(mergeGroupFields.length, comparatorFactories.length)];
+ groupByComparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < groupByComparators.length; i++) {
groupByComparators[i] = comparatorFactories[i].createBinaryComparator();
}
@@ -122,12 +134,12 @@
if (runs[i] != null) {
// Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
int memoryBudgetInBytes = ctx.getInitialFrameSize() * frameLimit;
- int groupByColumnsCount = mergeGroupFields.length;
- int hashTableCardinality = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(
- memoryBudgetInBytes, groupByColumnsCount, ctx.getInitialFrameSize());
+ int allFields = gbyFields.length + (fdFields == null ? 0 : fdFields.length);
+ int hashTableCardinality = ExternalGroupOperatorDescriptor
+ .calculateGroupByTableCardinality(memoryBudgetInBytes, allFields, ctx.getInitialFrameSize());
hashTableCardinality = Math.min(hashTableCardinality, numOfTuples[i]);
ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, hashTableCardinality,
- runs[i].getFileSize(), mergeGroupFields, groupByComparators, nmkComputer,
+ runs[i].getFileSize(), gbyFields, fdFields, groupByComparators, nmkComputer,
mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, frameLimit, level);
RunFileWriter[] runFileWriters = new RunFileWriter[partitionTable.getNumPartitions()];
int[] sizeInTuplesNextLevel =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
index 794ff98..067c9a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
@@ -33,8 +33,8 @@
// Sets a dummy variable.
IOperatorDescriptorRegistry spec = new JobSpecification(32768);
- ExternalGroupOperatorDescriptor eGByOp =
- new ExternalGroupOperatorDescriptor(spec, 0, 0, null, 4, null, null, null, null, null, null, null);
+ ExternalGroupOperatorDescriptor eGByOp = new ExternalGroupOperatorDescriptor(spec, 0, 0, null, null, 4, null,
+ null, null, null, null, null, null);
// Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes, with 1 column group-by
long memoryBudgetInBytes = 512;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index 0a57232..ae718bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -163,17 +163,18 @@
int tableSize = 8;
long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
- new FloatSumFieldAggregatorFactory(5, false) }),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
- new FloatSumFieldAggregatorFactory(3, false) }),
- outputRec, outputRec, new HashSpillableTableFactory(
- new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+ ExternalGroupOperatorDescriptor grouper =
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -253,7 +254,7 @@
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper =
- new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, frameLimits,
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
@@ -341,17 +342,18 @@
int tableSize = 8;
long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true, true) }),
- outputRec, outputRec, new HashSpillableTableFactory(
- new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+ ExternalGroupOperatorDescriptor grouper =
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true, true) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -432,7 +434,7 @@
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits,
+ keyFields, null, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
@@ -527,7 +529,7 @@
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper =
- new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, frameLimits,
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
@@ -624,7 +626,7 @@
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits,
+ keyFields, null, frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index c286e52..5a8ec34 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -126,7 +126,7 @@
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, fileSize / spec.getFrameSize() + 1,
+ keyFields, null, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
@@ -191,7 +191,7 @@
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, fileSize / spec.getFrameSize() + 1,
+ keyFields, null, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
index f1a4231..75ecc34 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
@@ -34,14 +35,14 @@
ExternalGroupWriteOperatorNodePushable mergeOperator;
@Override
- protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) {
+ protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException {
ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE });
buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx, this.hashCode(), tableSize,
- numFrames * ctx.getInitialFrameSize(), keyFields, numFrames, comparatorFactories,
+ numFrames * ctx.getInitialFrameSize(), keyFields, null, numFrames, comparatorFactories,
normalizedKeyComputerFactory, partialAggrInPlace, inRecordDesc, outputRec, tableFactory);
mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx, this.hashCode(), tableFactory, outputRec,
- outputRec, numFrames, keyFieldsAfterPartial, normalizedKeyComputerFactory, comparatorFactories,
+ outputRec, numFrames, keyFieldsAfterPartial, null, normalizedKeyComputerFactory, comparatorFactories,
finalAggrInPlace);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index cf04bee..2bc742a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -153,7 +153,7 @@
IOperatorDescriptor gBy;
int[] keys = new int[] { 0 };
if ("hash".equalsIgnoreCase(algo)) {
- gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+ gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, null, frameLimit,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 72660c0..a14b44d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -148,7 +148,7 @@
AbstractOperatorDescriptor grouper;
if (alg.equalsIgnoreCase("hash")) {// external hash graph
- grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+ grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, null, frameLimit,
new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
new IntegerNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 209bf34..3bbeca8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -221,7 +221,7 @@
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
- custFileSize + orderFileSize, new int[] { 6 }, memSize,
+ custFileSize + orderFileSize, new int[] { 6 }, null, memSize,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {