[NO ISSUE][COMP] Refactor physical operators for GroupBy and DistinctBy
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce AbstractGroupByPOperator -
base class for physical GroupBy operators
- Introduce AbstractDistinctByPOperator -
base class for physical DistinctBy operators
- Set memory limit for MicroPreclusteredGroupByPOperator
- Remove unused PhysicalOperatorTag.HASH_GROUP_BY
- Rename GroupByOperator.getGbyVarList() to getGroupByVarList()
- Remove constructors that do not specify memory limit from
PreclusteredGroupOperatorDescriptor and PreclusteredGroupWriter
Change-Id: I93f17a6a3e0df6587e518e18eea01ca54a289b4c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3364
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index efffda2..4314b3a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -167,10 +167,11 @@
serialAggExpr.setSourceLocation(expr.getSourceLocation());
aggOp.getExpressions().get(i).setValue(serialAggExpr);
}
- ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
- gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
+ ExternalGroupByPOperator externalGby =
+ new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
generateMergeAggregationExpressions(gby, context);
op.setPhysicalOperator(externalGby);
setToExternalGby = true;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 5d19134..404a8dc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -31,7 +31,6 @@
EXTERNAL_GROUP_BY,
EXTERNAL_LOOKUP,
FORWARD,
- HASH_GROUP_BY,
HASH_PARTITION_EXCHANGE,
HASH_PARTITION_MERGE_EXCHANGE,
HDFS_READER,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index 49bf062..0b280f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -89,7 +89,7 @@
return gByList;
}
- public List<LogicalVariable> getGbyVarList() {
+ public List<LogicalVariable> getGroupByVarList() {
List<LogicalVariable> varList = new ArrayList<>(gByList.size());
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
ILogicalExpression expr = ve.second.getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 8535204..64d1b61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -114,7 +114,7 @@
}
ILogicalOperator inputOp = op.getInputs().get(0).getValue();
long inputCardinality = inputOp.accept(this, arg);
- List<LogicalVariable> gbyVar = op.getGbyVarList();
+ List<LogicalVariable> gbyVar = op.getGroupByVarList();
if (inputCardinality == ONE_GROUP && keyVariables.containsAll(gbyVar)) {
keyVariables.clear();
return ONE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index f5f5f96..eaac45d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -388,7 +388,7 @@
if (op1.getOperatorTag() == LogicalOperatorTag.GROUP) {
GroupByOperator gby = (GroupByOperator) op1;
LinkedList<LogicalVariable> tail = new LinkedList<LogicalVariable>();
- for (LogicalVariable v : gby.getGbyVarList()) {
+ for (LogicalVariable v : gby.getGroupByVarList()) {
tail.add(v);
// all values for gby vars. are the same
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java
new file mode 100644
index 0000000..2352338
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+
+public abstract class AbstractDistinctByPOperator extends AbstractPhysicalOperator {
+
+ protected List<LogicalVariable> columnList;
+
+ protected AbstractDistinctByPOperator(List<LogicalVariable> columnList) {
+ this.columnList = columnList;
+ }
+
+ public List<LogicalVariable> getDistinctByColumns() {
+ return columnList;
+ }
+
+ public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
+ this.columnList = distinctByColumns;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+
+ protected int[] getKeysAndDecs(IOperatorSchema inputSchema) {
+ int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchema);
+ int sz = inputSchema.getSize();
+ int fdSz = sz - columnList.size();
+ int[] fdColumns = new int[fdSz];
+ int j = 0;
+ for (LogicalVariable v : inputSchema) {
+ if (!columnList.contains(v)) {
+ fdColumns[j++] = inputSchema.findVariable(v);
+ }
+ }
+ int[] keysAndDecs = new int[keys.length + fdColumns.length];
+ for (int i = 0; i < keys.length; i++) {
+ keysAndDecs[i] = keys[i];
+ }
+ for (int i = 0; i < fdColumns.length; i++) {
+ keysAndDecs[i + keys.length] = fdColumns[i];
+ }
+ return keysAndDecs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
new file mode 100644
index 0000000..ce6dedc
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
+
+ protected List<LogicalVariable> columnList;
+
+ protected final int framesLimit;
+
+ protected AbstractGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+ this.columnList = columnList;
+ this.framesLimit = framesLimit;
+ }
+
+ public List<LogicalVariable> getGroupByColumns() {
+ return columnList;
+ }
+
+ public void setGroupByColumns(List<LogicalVariable> columnList) {
+ this.columnList = columnList;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + columnList;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index c604e5c..a81bf97 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -38,18 +37,11 @@
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-public abstract class AbstractPreSortedDistinctByPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractPreSortedDistinctByPOperator extends AbstractDistinctByPOperator {
- protected List<LogicalVariable> columnList;
-
- public AbstractPreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
- this.columnList = columnList;
- }
-
- public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
- this.columnList = distinctByColumns;
+ protected AbstractPreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
}
@Override
@@ -78,31 +70,4 @@
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
- protected int[] getKeysAndDecs(IOperatorSchema inputSchema) {
- int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchema);
- int sz = inputSchema.getSize();
- int fdSz = sz - columnList.size();
- int[] fdColumns = new int[fdSz];
- int j = 0;
- for (LogicalVariable v : inputSchema) {
- if (!columnList.contains(v)) {
- fdColumns[j++] = inputSchema.findVariable(v);
- }
- }
- int[] keysAndDecs = new int[keys.length + fdColumns.length];
- for (int i = 0; i < keys.length; i++) {
- keysAndDecs[i] = keys[i];
- }
- for (int i = 0; i < fdColumns.length; i++) {
- keysAndDecs[i + keys.length] = fdColumns[i];
- }
- return keysAndDecs;
- }
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index c18d76c..23a411c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -57,25 +57,10 @@
import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractPreclusteredGroupByPOperator extends AbstractGroupByPOperator {
- protected List<LogicalVariable> columnList;
-
- public AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
- this.columnList = columnList;
- }
-
- @Override
- public String toString() {
- return getOperatorTag().toString() + columnList;
- }
-
- public List<LogicalVariable> getGbyColumns() {
- return columnList;
- }
-
- public void setGbyColumns(List<LogicalVariable> gByColumns) {
- this.columnList = gByColumns;
+ protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+ super(columnList, framesLimit);
}
// Obs: We don't propagate properties corresponding to decors, since they
@@ -170,7 +155,7 @@
IPhysicalOperator pop2 = op2.getPhysicalOperator();
if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
List<LogicalVariable> gbyColumns =
- ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+ ((AbstractPreclusteredGroupByPOperator) pop2).getGroupByColumns();
List<LogicalVariable> sndOrder = new ArrayList<>();
sndOrder.addAll(gbyColumns);
Set<LogicalVariable> freeVars = new HashSet<>();
@@ -230,7 +215,7 @@
}
List<FunctionalDependency> fdList = new ArrayList<>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : gby.getDecorList()) {
- List<LogicalVariable> hd = gby.getGbyVarList();
+ List<LogicalVariable> hd = gby.getGroupByVarList();
List<LogicalVariable> tl = new ArrayList<>();
tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
fdList.add(new FunctionalDependency(hd, tl));
@@ -301,18 +286,13 @@
return null;
}
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
// Returns the local structure property that is propagated from an input local structure property
// through a pre-clustered GROUP BY physical operator.
private ILocalStructuralProperty getPropagatedProperty(ILocalStructuralProperty lsp, GroupByOperator gby) {
PropertyType propertyType = lsp.getPropertyType();
if (propertyType == PropertyType.LOCAL_GROUPING_PROPERTY) {
// A new grouping property is generated.
- return new LocalGroupingProperty(new ListSet<>(gby.getGbyVarList()));
+ return new LocalGroupingProperty(new ListSet<>(gby.getGroupByVarList()));
} else {
LocalOrderProperty lop = (LocalOrderProperty) lsp;
List<OrderColumn> orderColumns = new ArrayList<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 652196d..927beae5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -66,28 +66,13 @@
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
+public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
private final long inputSize;
- private final int frameLimit;
- private List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
- public ExternalGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
- long fileSize) {
- this.frameLimit = frameLimit;
+ public ExternalGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, long fileSize) {
+ super(columnList, framesLimit);
this.inputSize = fileSize;
- computeColumnSet(gbyList);
- }
-
- public void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
- columnSet.clear();
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression v = (VariableReferenceExpression) expr;
- columnSet.add(v.getVariableReference());
- }
- }
}
@Override
@@ -96,19 +81,10 @@
}
@Override
- public String toString() {
- return getOperatorTag().toString() + columnSet;
- }
-
- @Override
public boolean isMicroOperator() {
return false;
}
- public List<LogicalVariable> getGbyColumns() {
- return columnSet;
- }
-
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
@@ -138,7 +114,7 @@
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
- new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null);
+ new ListSet<LogicalVariable>(columnList), context.getComputationNodeDomain()), null);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
@@ -149,7 +125,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- List<LogicalVariable> gbyCols = getGbyColumns();
+ List<LogicalVariable> gbyCols = getGroupByColumns();
int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
GroupByOperator gby = (GroupByOperator) op;
int numFds = gby.getDecorList().size();
@@ -262,14 +238,14 @@
JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
// Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
- int memoryBudgetInBytes = context.getFrameSize() * frameLimit;
+ int memoryBudgetInBytes = context.getFrameSize() * framesLimit;
int groupByColumnsCount = gby.getGroupByList().size() + numFds;
int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
groupByColumnsCount, context.getFrameSize());
ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
- keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
- recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
+ keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
+ mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
gbyOpDesc.setSourceLocation(gby.getSourceLocation());
contributeOpDesc(builder, gby, gbyOpDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
@@ -282,10 +258,4 @@
int[] outputDependencyLabels = new int[] { 1 };
return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
}
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
index fda879c..109b4f7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
@@ -73,7 +73,7 @@
/* make fd columns part of the key but the comparator only compares the distinct key columns */
MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keysAndDecs,
- comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+ comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, -1);
runtime.setSourceLocation(op.getSourceLocation());
builder.contributeMicroOperator(op, runtime, recordDescriptor);
ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 13308a1..350bcfb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -39,8 +39,8 @@
public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
- public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
- super(columnList);
+ public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+ super(columnList, framesLimit);
}
@Override
@@ -73,11 +73,10 @@
RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
- comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+ comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, framesLimit);
runtime.setSourceLocation(gby.getSourceLocation());
builder.contributeMicroOperator(gby, runtime, recordDescriptor);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7971e78..1f8451c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -72,7 +72,7 @@
if (originalLocalProperties != null) {
newLocalProperties = new ArrayList<>();
for (ILocalStructuralProperty lsp : originalLocalProperties) {
- ILocalStructuralProperty groupLocalLsp = lsp.regardToGroup(gby.getGbyVarList());
+ ILocalStructuralProperty groupLocalLsp = lsp.regardToGroup(gby.getGroupByVarList());
if (groupLocalLsp != null) {
// Adds the property that is satisfied in the context of a particular group.
newLocalProperties.add(groupLocalLsp);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index dd4c65f..61fc51f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -72,7 +72,7 @@
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
/* make fd columns part of the key but the comparator only compares the distinct key columns */
PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
- comparatorFactories, aggregatorFactory, recordDescriptor);
+ comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
opDesc.setSourceLocation(op.getSourceLocation());
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index e5076ce..b6faa36 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -43,12 +43,10 @@
public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
private final boolean groupAll;
- private final int framesLimit;
public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
- super(columnList);
+ super(columnList, framesLimit);
this.groupAll = groupAll;
- this.framesLimit = framesLimit;
}
@Override
@@ -101,5 +99,4 @@
public String toString() {
return getOperatorTag().toString() + (groupAll ? "(ALL)" : "") + columnList;
}
-
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index f2a0b71..af8161f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -65,28 +65,13 @@
import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
-public class SortGroupByPOperator extends AbstractPhysicalOperator {
+public class SortGroupByPOperator extends AbstractGroupByPOperator {
- private final int frameLimit;
private final OrderColumn[] orderColumns;
- private final List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
- public SortGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
- OrderColumn[] orderColumns) {
- this.frameLimit = frameLimit;
+ public SortGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, OrderColumn[] orderColumns) {
+ super(columnList, framesLimit);
this.orderColumns = orderColumns;
- computeColumnSet(gbyList);
- }
-
- private void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
- columnSet.clear();
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression v = (VariableReferenceExpression) expr;
- columnSet.add(v.getVariableReference());
- }
- }
}
@Override
@@ -95,19 +80,10 @@
}
@Override
- public String toString() {
- return getOperatorTag().toString() + columnSet;
- }
-
- @Override
public boolean isMicroOperator() {
return false;
}
- private List<LogicalVariable> getGbyColumns() {
- return columnSet;
- }
-
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
@@ -145,7 +121,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- List<LogicalVariable> gbyCols = getGbyColumns();
+ List<LogicalVariable> gbyCols = getGroupByColumns();
int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
GroupByOperator gby = (GroupByOperator) op;
int numFds = gby.getDecorList().size();
@@ -273,7 +249,7 @@
normalizedKeyFactory =
orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider.getNormalizedKeyComputerFactory(type, true)
: nkcfProvider.getNormalizedKeyComputerFactory(type, false);
- SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
+ SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, framesLimit, keys,
keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
partialAggRecordDescriptor, recordDescriptor, false);
gbyOpDesc.setSourceLocation(gby.getSourceLocation());
@@ -289,9 +265,4 @@
int[] outputDependencyLabels = new int[] { 1 };
return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
}
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 66ee453..61a843c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -43,13 +43,13 @@
private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
private final ILogicalPlan plan;
- private static final PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
- PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
- PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
- PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
- PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
- PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT,
- PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
+ private static final PhysicalOperatorTag[] hyracksOperators =
+ new PhysicalOperatorTag[] { PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+ PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+ PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+ PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+ PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE,
+ PhysicalOperatorTag.STABLE_SORT, PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
public static final PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
public HeuristicOptimizer(ILogicalPlan plan,
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index be26702..d9746b2 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -140,7 +140,7 @@
Map<String, Object> annotations = newGbyOp.getAnnotations();
annotations.putAll(gbyOp.getAnnotations());
- List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+ List<LogicalVariable> gbyVars = gbyOp.getGroupByVarList();
// Backup nested plans since tryToPushSubplan(...) may mutate them.
List<ILogicalPlan> gbyNestedPlans = gbyOp.getNestedPlans();
@@ -306,7 +306,7 @@
return true;
} else {
GroupByOperator nestedGby = (GroupByOperator) op3;
- List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+ List<LogicalVariable> gbyVars2 = nestedGby.getGroupByVarList();
Set<LogicalVariable> freeVars = new HashSet<>();
// Removes non-free variables defined in the nested plan.
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(nestedGby, freeVars);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
index 19dc21e..4368d39 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
@@ -59,7 +59,7 @@
if (!groupOp.isGroupAll()) {
return false;
}
- List<LogicalVariable> groupVars = groupOp.getGbyVarList();
+ List<LogicalVariable> groupVars = groupOp.getGroupByVarList();
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = groupOp.getDecorList();
if (!groupVars.isEmpty() || !decorList.isEmpty()) {
return false;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 7dc596c..67c1f1d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -59,12 +59,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -387,18 +386,12 @@
private void optimizeUsingConstraintsAndEquivClasses(AbstractLogicalOperator op) {
IPhysicalOperator pOp = op.getPhysicalOperator();
switch (pOp.getOperatorTag()) {
- case HASH_GROUP_BY:
- case EXTERNAL_GROUP_BY: {
- GroupByOperator gby = (GroupByOperator) op;
- ExternalGroupByPOperator hgbyOp = (ExternalGroupByPOperator) pOp;
- hgbyOp.computeColumnSet(gby.getGroupByList());
- break;
- }
+ case EXTERNAL_GROUP_BY:
case PRE_CLUSTERED_GROUP_BY:
case MICRO_PRE_CLUSTERED_GROUP_BY: {
GroupByOperator gby = (GroupByOperator) op;
- AbstractPreclusteredGroupByPOperator preSortedGby = (AbstractPreclusteredGroupByPOperator) pOp;
- preSortedGby.setGbyColumns(gby.getGbyVarList());
+ AbstractGroupByPOperator gbyPhysOp = (AbstractGroupByPOperator) pOp;
+ gbyPhysOp.setGroupByColumns(gby.getGroupByVarList());
break;
}
case PRE_SORTED_DISTINCT_BY:
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 192e318..4c57f21 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -103,7 +103,7 @@
//replace preclustered gby with sort gby
if (!groupByOperator.isGroupAll()) {
- op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(),
+ op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByVarList(),
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
sortPhysicalOperator.getSortColumns()));
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index a7bf11e..1d5a7e9 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -124,7 +124,6 @@
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
IOptimizationContext context) throws AlgebricksException {
PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -175,10 +174,11 @@
boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
if (hasIntermediateAgg) {
- ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
- gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
- * physicalOptimizationConfig.getFrameSize());
+ ExternalGroupByPOperator externalGby =
+ new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
op.setPhysicalOperator(externalGby);
break;
}
@@ -186,20 +186,12 @@
}
}
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- columnList.add(varRef.getVariableReference());
- }
- }
if (topLevelOp) {
- op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
- context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
+ op.setPhysicalOperator(new PreclusteredGroupByPOperator(gby.getGroupByVarList(),
+ gby.isGroupAll(), context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
} else {
- op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
+ op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
}
break;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 4678887..b778d06 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -39,10 +39,11 @@
private final IAggregatorDescriptorFactory aggregatorFactory;
private final RecordDescriptor inRecordDesc;
private final RecordDescriptor outRecordDesc;
+ private final int framesLimit;
public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDesc, int[] projectionList) {
+ RecordDescriptor outRecordDesc, int[] projectionList, int framesLimit) {
super(projectionList);
// Obs: the projection list is currently ignored.
if (projectionList != null) {
@@ -53,6 +54,7 @@
this.aggregatorFactory = aggregatorFactory;
this.inRecordDesc = inRecordDesc;
this.outRecordDesc = outRecordDesc;
+ this.framesLimit = framesLimit;
}
@Override
@@ -70,7 +72,7 @@
@Override
public void open() throws HyracksDataException {
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
- outRecordDesc, writer);
+ outRecordDesc, writer, false, false, framesLimit);
pgw.open();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 395b553..99967c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -443,7 +443,7 @@
RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor gby = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 3 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -853,7 +853,7 @@
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
MicroPreClusteredGroupRuntimeFactory gby = new MicroPreClusteredGroupRuntimeFactory(new int[] { 3 },
new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, sortDesc, gbyDesc,
- null);
+ null, -1);
// the algebricks op.
IScalarEvaluatorFactory cond =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 739de74..8cf8401 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -39,18 +39,6 @@
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor) {
- this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
- }
-
- public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
- IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, int framesLimit) {
- this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, framesLimit);
- }
-
- public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
- IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor recordDescriptor, boolean groupAll, int framesLimit) {
super(spec, 1, 1);
this.groupFields = groupFields;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index bb731fe..14ce5ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -60,12 +60,6 @@
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
- this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false, -1);
- }
-
- public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, outputPartial,
false, -1);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index feba97e..da85e74 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -125,7 +125,7 @@
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
new FloatSumFieldAggregatorFactory(5, true) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -214,7 +214,7 @@
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -303,7 +303,7 @@
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
new MinMaxStringFieldAggregatorFactory(15, true, false) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -391,7 +391,7 @@
UTF8StringBinaryComparatorFactory.INSTANCE },
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -485,7 +485,7 @@
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
new CountFieldAggregatorFactory(true), new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
@@ -583,7 +583,7 @@
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
new MinMaxStringFieldAggregatorFactory(15, true, false) }),
- outputRec);
+ outputRec, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index 508d017..367fc51d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -78,12 +78,11 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc2, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC2_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -92,12 +91,11 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group2 =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc3);
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc3, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -149,12 +147,11 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc2, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -163,12 +160,11 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group2 =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc3);
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc3, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -221,12 +217,11 @@
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc2);
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc2, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -235,12 +230,11 @@
RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group2 =
- new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- desc3);
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ desc3, false, -1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 43598a5..cf04bee 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -192,7 +192,7 @@
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- groupResultDesc);
+ groupResultDesc, false, -1);
createPartitionConstraint(spec, gBy, outSplits);
OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
spec.connect(sortGroupConn, sorter, 0, gBy, 0);