pull common code into AbstractPreclusteredGroupByPOperator
Change-Id: Iff0ae50039b70082e4c63468b4a8220c78ace977
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1188
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 9c06d2c..ab68b68 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -38,10 +38,12 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -52,7 +54,6 @@
import org.apache.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
@@ -231,6 +232,22 @@
return null;
}
+ protected int[] getFdColumns(GroupByOperator gby, IOperatorSchema inputSchema) throws AlgebricksException {
+ int numFds = gby.getDecorList().size();
+ int fdColumns[] = new int[numFds];
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("pre-sorted group-by expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ fdColumns[j++] = inputSchema.findVariable(decor);
+ }
+ return fdColumns;
+ }
+
private static LogicalVariable getLhsGbyVar(GroupByOperator gby, LogicalVariable var) {
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getGroupByList()) {
ILogicalExpression e = ve.second.getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 6e8933c..2772ee7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -20,18 +20,12 @@
import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -65,19 +59,8 @@
throws AlgebricksException {
int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
GroupByOperator gby = (GroupByOperator) op;
- int numFds = gby.getDecorList().size();
- int fdColumns[] = new int[numFds];
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
- int j = 0;
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new AlgebricksException("pre-sorted group-by expects variable references.");
- }
- VariableReferenceExpression v = (VariableReferenceExpression) expr;
- LogicalVariable decor = v.getVariableReference();
- fdColumns[j++] = inputSchemas[0].findVariable(decor);
- }
+ int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
// compile subplans and set the gby op. schema accordingly
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index e0df2f9..a636d10 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -20,17 +20,12 @@
import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
@@ -70,18 +65,7 @@
throws AlgebricksException {
int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
GroupByOperator gby = (GroupByOperator) op;
- int numFds = gby.getDecorList().size();
- int fdColumns[] = new int[numFds];
- int j = 0;
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new AlgebricksException("pre-sorted group-by expects variable references.");
- }
- VariableReferenceExpression v = (VariableReferenceExpression) expr;
- LogicalVariable decor = v.getVariableReference();
- fdColumns[j++] = inputSchemas[0].findVariable(decor);
- }
+ int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
// compile subplans and set the gby op. schema accordingly
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
IAggregatorDescriptorFactory aggregatorFactory;