This change list includes several fixes:
1. Adds a rule to push subplan into group-by
2. Adds a rule to eliminate subplan with input cardinality one
3. Fix the nested running aggregate runtime
4. Adds a wrapper of FrameTupleAppender to internally flush full frames.
A TODO item is to cleanup existing usage of FrameTupleAppender to use the wrapper, which makes code simpler.
Change-Id: I647f9bce2f40700b18bdcad1fa64fb8f0a26838b
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/149
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Preston Carman <ecarm002@ucr.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 08f985b..f3cf0a4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -12,77 +12,86 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class VariableUtilities {
-
- public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
- throws AlgebricksException {
- ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
- op.accept(visitor, null);
- }
-
- public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
- throws AlgebricksException {
- ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
- op.accept(visitor, null);
- }
-
- public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
- throws AlgebricksException {
- ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
- op.accept(visitor, null);
- }
-
- public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
- throws AlgebricksException {
- // DFS traversal
- VariableUtilities.getUsedVariables(op, vars);
- for (Mutable<ILogicalOperator> c : op.getInputs()) {
- getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);
- }
- }
-
- public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
- ITypingContext ctx) throws AlgebricksException {
- substituteVariables(op, v1, v2, true, ctx);
- }
-
- public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
- LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
- for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
- substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
- }
- substituteVariables(op, v1, v2, true, ctx);
- }
-
- public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
- boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
- ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
- goThroughNts, ctx);
- op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
- }
-
- public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
- Set<T> varSet = new HashSet<T>();
- Set<T> varArgSet = new HashSet<T>();
- varSet.addAll(var);
- varArgSet.addAll(varArg);
- return varSet.equals(varArgSet);
- }
-
-}
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class VariableUtilities {
+
+ public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
+ throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
+ op.accept(visitor, null);
+ }
+
+ public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+ throws AlgebricksException {
+ // DFS traversal
+ VariableUtilities.getUsedVariables(op, vars);
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);
+ }
+ }
+
+ public static void getProducedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+ throws AlgebricksException {
+ // DFS traversal
+ VariableUtilities.getProducedVariables(op, vars);
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ getProducedVariablesInDescendantsAndSelf(c.getValue(), vars);
+ }
+ }
+
+ public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+ ITypingContext ctx) throws AlgebricksException {
+ substituteVariables(op, v1, v2, true, ctx);
+ }
+
+ public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
+ LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+ substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
+ }
+ substituteVariables(op, v1, v2, true, ctx);
+ }
+
+ public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+ boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+ ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
+ goThroughNts, ctx);
+ op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
+ }
+
+ public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
+ Set<T> varSet = new HashSet<T>();
+ Set<T> varArgSet = new HashSet<T>();
+ varSet.addAll(var);
+ varArgSet.addAll(varArg);
+ return varSet.equals(varArgSet);
+ }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 0838291..6648252 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -99,6 +100,55 @@
}
}
+ /**
+ * Adds the free variables of the operator path from
+ * op to dest, where dest is a direct/indirect input operator of op in the query plan.
+ *
+ * @param op
+ * , the start operator.
+ * @param dest
+ * , the destination operator (a direct/indirect input operator).
+ * @param freeVars
+ * - The collection to which the free variables will be added.
+ */
+ public static void getFreeVariablesInPath(ILogicalOperator op, ILogicalOperator dest, Set<LogicalVariable> freeVars)
+ throws AlgebricksException {
+ Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(op, freeVars);
+ collectUsedAndProducedVariablesInPath(op, dest, freeVars, producedVars);
+ freeVars.removeAll(producedVars);
+ }
+
+ /**
+ * @param op
+ * , the start operator.
+ * @param dest
+ * , the destination operator (a direct/indirect input operator).
+ * @param usedVars
+ * , the collection of used variables.
+ * @param producedVars
+ * , the collection of produced variables.
+ * @return if the current operator is on the path from the original start operator to the destination operator.
+ * @throws AlgebricksException
+ */
+ private static boolean collectUsedAndProducedVariablesInPath(ILogicalOperator op, ILogicalOperator dest,
+ Set<LogicalVariable> usedVars, Set<LogicalVariable> producedVars) throws AlgebricksException {
+ if (op == dest) {
+ return true;
+ }
+ boolean onPath = false;
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ if (collectUsedAndProducedVariablesInPath(childRef.getValue(), dest, usedVars, producedVars)) {
+ onPath = true;
+ }
+ }
+ if (onPath) {
+ VariableUtilities.getUsedVariables(op, usedVars);
+ VariableUtilities.getProducedVariables(op, producedVars);
+ }
+ return onPath;
+ }
+
public static void getFreeVariablesInSubplans(AbstractOperatorWithNestedPlans op, Set<LogicalVariable> freeVars)
throws AlgebricksException {
for (ILogicalPlan p : op.getNestedPlans()) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
new file mode 100644
index 0000000..1397956
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule eliminates a subplan with the following pattern:
+ * -- SUBPLAN
+ * -- OP (where OP produces exactly one tuple)
+ * The live variables at OP will not be used after SUBPLAN.
+ * Note: This rule must be applied after
+ * the RemoveRedundantVariablesRule (to avoid the lineage analysis of variable cardinality).
+ *
+ * @author yingyib
+ */
+public class EliminateSubplanWithInputCardinalityOneRule implements IAlgebraicRewriteRule {
+ /** The pointer to the topmost operator */
+ private Mutable<ILogicalOperator> rootRef;
+ /** Whether the rule has even been invoked */
+ private boolean invoked = false;
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ if (!invoked) {
+ rootRef = opRef;
+ invoked = true;
+ }
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getInputs().size() <= 0) {
+ return false;
+ }
+ boolean changed = false;
+ for (Mutable<ILogicalOperator> subplanRef : op.getInputs()) {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) subplanRef.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+ continue;
+ }
+
+ SubplanOperator subplan = (SubplanOperator) op1;
+ Set<LogicalVariable> usedVarsUp = new ListSet<LogicalVariable>();
+ OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp);
+ // TODO(buyingyi): figure out the rewriting for subplan operators with multiple subplans.
+ if (subplan.getNestedPlans().size() != 1) {
+ continue;
+ }
+
+ ILogicalOperator subplanInputOperator = subplan.getInputs().get(0).getValue();
+ Set<LogicalVariable> subplanInputVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(subplanInputOperator, subplanInputVars);
+ int subplanInputVarSize = subplanInputVars.size();
+ subplanInputVars.removeAll(usedVarsUp);
+ // Makes sure the free variables are only used in the subplan.
+ if (subplanInputVars.size() < subplanInputVarSize) {
+ continue;
+ }
+ Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+ OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
+ boolean cardinalityOne = isCardinalityOne(subplan.getInputs().get(0), freeVars);
+ if (cardinalityOne) {
+ /** If the cardinality of freeVars in the subplan is one, the subplan can be removed. */
+ ILogicalPlan plan = subplan.getNestedPlans().get(0);
+
+ List<Mutable<ILogicalOperator>> rootRefs = plan.getRoots();
+ // TODO(buyingyi): investigate the case of multi-root plans.
+ if (rootRefs.size() != 1) {
+ continue;
+ }
+ Set<Mutable<ILogicalOperator>> ntsSet = new ListSet<Mutable<ILogicalOperator>>();
+ findNts(rootRefs.get(0), ntsSet);
+
+ /** Replaces nts with the input operator of the subplan. */
+ for (Mutable<ILogicalOperator> nts : ntsSet) {
+ nts.setValue(subplanInputOperator);
+ }
+ subplanRef.setValue(rootRefs.get(0).getValue());
+ changed = true;
+ } else {
+ continue;
+ }
+ }
+ return changed;
+ }
+
+ /**
+ * Whether the cardinality of the input free variables are one.
+ *
+ * @param opRef
+ * the operator to be checked (including its input operators)
+ * @param freeVars
+ * variables to be checked for produced operators
+ * @return true if every input variable has cardinality one; false otherwise.
+ * @throws AlgebricksException
+ */
+ private boolean isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars)
+ throws AlgebricksException {
+ Set<LogicalVariable> varsWithCardinalityOne = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> varsLiveAtUnnestAndJoin = new ListSet<LogicalVariable>();
+ isCardinalityOne(opRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+ varsWithCardinalityOne.removeAll(varsLiveAtUnnestAndJoin);
+ return varsWithCardinalityOne.equals(freeVars);
+ }
+
+ /**
+ * Recursively adding variables which has cardinality one and in int the input free variable set.
+ *
+ * @param opRef
+ * , the current operator reference.
+ * @param freeVars
+ * , a set of variables.
+ * @param varsWithCardinalityOne
+ * , variables in the free variable set with cardinality one at the time they are created.
+ * @param varsLiveAtUnnestAndJoin
+ * , live variables at Unnest and Join. The cardinalities of those variables can become more than one
+ * even if their cardinalities were one at the time those variables were created.
+ * @throws AlgebricksException
+ */
+ private void isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars,
+ Set<LogicalVariable> varsWithCardinalityOne, Set<LogicalVariable> varsLiveAtUnnestAndJoin)
+ throws AlgebricksException {
+ AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue();
+ List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(operator, producedVars);
+ if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
+ || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+ || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+ VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
+ }
+ if (operator.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ for (LogicalVariable producedVar : producedVars) {
+ if (freeVars.contains(producedVar)) {
+ varsWithCardinalityOne.add(producedVar);
+ }
+ }
+ }
+ if (varsWithCardinalityOne.size() == freeVars.size()) {
+ return;
+ }
+ for (Mutable<ILogicalOperator> childRef : operator.getInputs()) {
+ isCardinalityOne(childRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+ }
+ }
+
+ /**
+ * Find the NestedTupleSource operator in the direct/undirect input operators of opRef.
+ *
+ * @param opRef
+ * , the current operator reference.
+ * @param ntsSet
+ * , the set NestedTupleSource operator references.
+ */
+ private void findNts(Mutable<ILogicalOperator> opRef, Set<Mutable<ILogicalOperator>> ntsSet) {
+ int childSize = opRef.getValue().getInputs().size();
+ if (childSize == 0) {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ ntsSet.add(opRef);
+ }
+ return;
+ }
+ for (Mutable<ILogicalOperator> childRef : opRef.getValue().getInputs()) {
+ findNts(childRef, ntsSet);
+ }
+ }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
index 1c66317..ab7835d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
@@ -27,6 +27,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -160,6 +161,16 @@
testForNull = innerUnnest.getVariable();
break;
}
+ case RUNNINGAGGREGATE: {
+ ILogicalOperator inputToRunningAggregate = right.getInputs().get(0).getValue();
+ Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getProducedVariables(inputToRunningAggregate, producedVars);
+ if (!producedVars.isEmpty()) {
+ // Select [ $y != null ]
+ testForNull = producedVars.iterator().next();
+ }
+ break;
+ }
case DATASOURCESCAN: {
DataSourceScanOperator innerScan = (DataSourceScanOperator) right;
// Select [ $y != null ]
@@ -184,7 +195,8 @@
IFunctionInfo finfoNot = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NOT);
ScalarFunctionCallExpression nonNullTest = new ScalarFunctionCallExpression(finfoNot,
new MutableObject<ILogicalExpression>(isNullTest));
- SelectOperator selectNonNull = new SelectOperator(new MutableObject<ILogicalExpression>(nonNullTest), false, null);
+ SelectOperator selectNonNull = new SelectOperator(new MutableObject<ILogicalExpression>(nonNullTest), false,
+ null);
GroupByOperator g = new GroupByOperator();
Mutable<ILogicalOperator> newSubplanRef = new MutableObject<ILogicalOperator>(subplan);
NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(g));
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
new file mode 100644
index 0000000..4dfede0
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule pushes a subplan on top of a group-by into the
+ * nested plan of the group-by.
+ *
+ * @author yingyib
+ */
+public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator parentOperator = opRef.getValue();
+ if (parentOperator.getInputs().size() <= 0) {
+ return false;
+ }
+ boolean changed = false;
+ for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue();
+ /** Only processes subplan operator. */
+ if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ /** Only processes the case a group-by operator is the input of the subplan operator. */
+ if (child.getOperatorTag() == LogicalOperatorTag.GROUP) {
+ SubplanOperator subplan = (SubplanOperator) op;
+ GroupByOperator gby = (GroupByOperator) child;
+ List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
+ List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
+ List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
+ for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
+ List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
+ List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
+ for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
+ /** Gets free variables in the root operator of a nested plan and its descent. */
+ Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
+ Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+ producedVars);
+ freeVars.removeAll(producedVars);
+
+ /**
+ * Checks whether the above freeVars are all contained in live variables
+ * of one nested plan inside the group-by operator.
+ * If yes, then the subplan can be pushed into the nested plan of the group-by.
+ */
+ for (ILogicalPlan gbyNestedPlan : gbyNestedPlans) {
+ List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
+ for (Mutable<ILogicalOperator> gbyRootOpRef : gbyRootOpRefs) {
+ Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+ if (liveVars.containsAll(freeVars)) {
+ /** Does the actual push. */
+ Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
+ ntsRef.setValue(gbyRootOpRef.getValue());
+ gbyRootOpRef.setValue(rootOpRef.getValue());
+ rootOpRefsToRemove.add(rootOpRef);
+ changed = true;
+ }
+ }
+ }
+ }
+ rootOpRefs.removeAll(rootOpRefsToRemove);
+ if (rootOpRefs.size() == 0) {
+ subplanNestedPlansToRemove.add(subplanNestedPlan);
+ }
+ }
+ subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
+ if (subplanNestedPlans.size() == 0) {
+ ref.setValue(gby);
+ }
+ }
+ }
+ }
+ return changed;
+ }
+
+ private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {
+ Mutable<ILogicalOperator> currentOpRef = opRef;
+ while (currentOpRef.getValue().getInputs().size() > 0) {
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ return currentOpRef;
+ }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 918be11..c8dc852 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -37,9 +37,9 @@
public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
- private AlgebricksPipeline[] subplans;
- private int[] keyFieldIdx;
- private int[] decorFieldIdx;
+ private final AlgebricksPipeline[] subplans;
+ private final int[] keyFieldIdx;
+ private final int[] decorFieldIdx;
public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
this.subplans = subplans;
@@ -93,7 +93,6 @@
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
- pipelines[i].forceFlush();
}
}
@@ -103,13 +102,16 @@
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
- pipelines[i].forceFlush();
}
}
@Override
public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
+ for (int i = 0; i < pipelines.length; ++i) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].close();
+ }
return false;
}
@@ -131,14 +133,7 @@
@Override
public void close() {
- for (int i = 0; i < pipelines.length; ++i) {
- try {
- outputWriter.setInputIdx(i);
- pipelines[i].close();
- } catch (HyracksDataException e) {
- throw new IllegalStateException(e);
- }
- }
+
}
};
}
@@ -165,15 +160,15 @@
private static class RunningAggregatorOutput implements IFrameWriter {
- private FrameTupleAccessor[] tAccess;
- private RecordDescriptor[] inputRecDesc;
+ private final FrameTupleAccessor[] tAccess;
+ private final RecordDescriptor[] inputRecDesc;
private int inputIdx;
- private ArrayTupleBuilder tb;
- private ArrayTupleBuilder gbyTb;
- private AlgebricksPipeline[] subplans;
- private IFrameWriter outputWriter;
- private ByteBuffer outputFrame;
- private FrameTupleAppender outputAppender;
+ private final ArrayTupleBuilder tb;
+ private final ArrayTupleBuilder gbyTb;
+ private final AlgebricksPipeline[] subplans;
+ private final IFrameWriter outputWriter;
+ private final ByteBuffer outputFrame;
+ private final FrameTupleAppender outputAppender;
public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 7ecb288..2e1171c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
@@ -76,9 +75,7 @@
@Override
public void open() throws HyracksDataException {
- IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
- outRecordDesc, groupFields, groupFields, writer);
- pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
outRecordDesc, writer);
pgw.open();
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 600d641..154f2d1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,15 +28,14 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
- private int[] outColumns;
- private IRunningAggregateEvaluatorFactory[] runningAggregates;
+ private final int[] outColumns;
+ private final IRunningAggregateEvaluatorFactory[] runningAggregates;
/**
* @param outColumns
@@ -83,17 +82,13 @@
}
return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
+ private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
+ private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
@Override
public void open() throws HyracksDataException {
- if (!first) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- }
initAccessAppendRef(ctx);
if (first) {
first = false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
new file mode 100644
index 0000000..2de4256
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+/**
+ * This class wraps the calls of FrameTupleAppender and
+ * allows user to not worry about flushing full frames.
+ * TODO(yingyib): cleanup existing usage of FrameTupleAppender.
+ *
+ * @author yingyib
+ */
+public class FrameTupleAppenderWrapper {
+ private final FrameTupleAppender frameTupleAppender;
+ private final ByteBuffer outputFrame;
+ private final IFrameWriter outputWriter;
+
+ public FrameTupleAppenderWrapper(FrameTupleAppender frameTupleAppender, ByteBuffer outputFrame,
+ IFrameWriter outputWriter) {
+ this.frameTupleAppender = frameTupleAppender;
+ this.outputFrame = outputFrame;
+ this.outputWriter = outputWriter;
+ }
+
+ public void open() throws HyracksDataException {
+ outputWriter.open();
+ }
+
+ public void flush() throws HyracksDataException {
+ if (frameTupleAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ outputWriter.close();
+ }
+
+ public void fail() throws HyracksDataException {
+ outputWriter.fail();
+ }
+
+ public void reset(ByteBuffer buffer, boolean clear) {
+ frameTupleAppender.reset(buffer, clear);
+ }
+
+ public void appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
+ throws HyracksDataException {
+ if (!frameTupleAppender.append(fieldSlots, bytes, offset, length)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
+ if (!frameTupleAppender.append(bytes, offset, length)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.append(bytes, offset, length)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) throws HyracksDataException {
+ if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+ if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+ int dataLen1) throws HyracksDataException {
+ if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+ public void appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException {
+ if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ frameTupleAppender.reset(outputFrame, true);
+ if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
+ throw new HyracksDataException("The output cannot be fit into a frame.");
+ }
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 71af928..9ce70c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -24,11 +24,9 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-class PreclusteredGroupOperatorNodePushable extends
- AbstractUnaryInputUnaryOutputOperatorNodePushable {
+class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private final IHyracksTaskContext ctx;
private final int[] groupFields;
private final IBinaryComparatorFactory[] comparatorFactories;
@@ -54,15 +52,13 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, groupFields, groupFields, writer);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
copyFrameAccessor.reset(copyFrame);
ByteBuffer outFrame = ctx.allocateFrame();
final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(outFrame, true);
- pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDescriptor,
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor,
outRecordDescriptor, writer);
pgw.open();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 114463f..b67e236 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -24,22 +24,22 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class PreclusteredGroupWriter implements IFrameWriter {
private final int[] groupFields;
private final IBinaryComparator[] comparators;
private final IAggregatorDescriptor aggregator;
private final AggregateState aggregateState;
- private final IFrameWriter writer;
private final ByteBuffer copyFrame;
private final FrameTupleAccessor inFrameAccessor;
private final FrameTupleAccessor copyFrameAccessor;
- private final ByteBuffer outFrame;
- private final FrameTupleAppender appender;
+ private final FrameTupleAppenderWrapper appenderWrapper;
private final ArrayTupleBuilder tupleBuilder;
private boolean outputPartial = false;
@@ -48,35 +48,36 @@
private boolean isFailed = false;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
- IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
- this(ctx, groupFields, comparators, aggregator, inRecordDesc, outRecordDesc, writer);
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
+ this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer);
this.outputPartial = outputPartial;
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
- IFrameWriter writer) throws HyracksDataException {
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
- this.aggregator = aggregator;
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
+ groupFields, writer);
this.aggregateState = aggregator.createAggregateStates();
- this.writer = writer;
copyFrame = ctx.allocateFrame();
inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
copyFrameAccessor.reset(copyFrame);
- outFrame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize());
+ ByteBuffer outFrame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(outFrame, true);
+ appenderWrapper = new FrameTupleAppenderWrapper(appender, outFrame, writer);
tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
}
@Override
public void open() throws HyracksDataException {
- writer.open();
+ appenderWrapper.open();
first = true;
}
@@ -133,15 +134,9 @@
lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
lastTupleIndex, aggregateState);
- if (hasOutput
- && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new HyracksDataException("The output cannot be fit into a frame.");
- }
+ if (hasOutput) {
+ appenderWrapper.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize());
}
}
@@ -163,19 +158,17 @@
@Override
public void fail() throws HyracksDataException {
isFailed = true;
- writer.fail();
+ appenderWrapper.fail();
}
@Override
public void close() throws HyracksDataException {
if (!isFailed && !first) {
writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
+ appenderWrapper.flush();
}
aggregator.close();
aggregateState.close();
- writer.close();
+ appenderWrapper.close();
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index 2a28dea..e695828 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -28,7 +28,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
@@ -111,9 +110,7 @@
for (int i = 0; i < comparators.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc,
- groupFields, groupFields, writer);
- PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator,
+ PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory,
this.inRecordDesc, this.outRecordDesc, writer, true);
pgw.open();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 1f9b358..2a580d3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -31,7 +31,6 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
@@ -55,8 +54,8 @@
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
- private IFrameSorter frameSorter; // Used in External sort, no replacement
- // selection
+ private final IFrameSorter frameSorter; // Used in External sort, no replacement
+ // selection
private final int[] groupFields;
private final INormalizedKeyComputer firstKeyNkc;
@@ -67,7 +66,7 @@
private final int[] mergeSortFields;
private final int[] mergeGroupFields;
- private IBinaryComparator[] groupByComparators;
+ private final IBinaryComparator[] groupByComparators;
// Constructor for external sort, no replacement selection
public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
@@ -115,10 +114,8 @@
public void process() throws HyracksDataException {
IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
- IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
- groupFields, groupFields, writer);
- PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregator,
- inputRecordDesc, outRecordDesc, writer, false);
+ PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators,
+ aggregatorFactory, inputRecordDesc, outRecordDesc, writer, false);
try {
if (runs.size() <= 0) {
pgw.open();
@@ -149,9 +146,7 @@
IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
- aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc,
- partialAggRecordDesc, mergeGroupFields, mergeGroupFields, mergeResultWriter);
- pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+ pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
partialAggRecordDesc, partialAggRecordDesc, mergeResultWriter, true);
pgw.open();
@@ -166,10 +161,8 @@
}
}
if (!runs.isEmpty()) {
- aggregator = mergeAggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
- mergeGroupFields, mergeGroupFields, writer);
- pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
- partialAggRecordDesc, outRecordDesc, writer, false);
+ pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators,
+ mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, writer, false);
pgw.open();
IFrameReader[] runCursors = new RunFileReader[runs.size()];
for (int i = 0; i < runCursors.length; i++) {