merged hyracks_asterix_stabilization r1792:1829 to hyracks_lsm_tree
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1836 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index d9496f8..2f53f9b 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,6 +20,7 @@
// private ArrayList<AggregateFunctionCallExpression> expressions;
// TODO type safe list of expressions
private List<Mutable<ILogicalExpression>> mergeExpressions;
+ private LogicalVariable partitioningVariable;
public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
super(variables, expressions);
@@ -68,6 +69,14 @@
return mergeExpressions;
}
+ public void setPartitioningVariable(LogicalVariable partitioningVariable) {
+ this.partitioningVariable = partitioningVariable;
+ }
+
+ public LogicalVariable getPartitioningVariable() {
+ return partitioningVariable;
+ }
+
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 56487cc..3a82ccd 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -71,6 +71,9 @@
for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
exprRef.getValue().getUsedVariables(usedVariables);
}
+ if (op.getPartitioningVariable() != null) {
+ usedVariables.add(op.getPartitioningVariable());
+ }
return null;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 1397a56..81dc2c2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,10 +16,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -29,12 +31,15 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -62,7 +67,16 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- return emptyUnaryRequirements();
+ AggregateOperator aggOp = (AggregateOperator) op;
+ if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
+ partitioningVariables.add(aggOp.getPartitioningVariable());
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ } else {
+ return emptyUnaryRequirements();
+ }
}
@Override
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index e6b296b..471b71c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -68,7 +68,7 @@
runPhysicalOptimizations(plan, physicalRewrites);
StringBuilder sb2 = new StringBuilder();
PlanPrettyPrinter.printPlan(plan, sb2, ppvisitor, 0);
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Optimized Plan:\n" + sb2.toString());
+ AlgebricksConfig.ALGEBRICKS_LOGGER.info("Optimized Plan:\n" + sb2.toString());
}
private void runOptimizationSets(ILogicalPlan plan,
diff --git a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 90ee5bb..b2a5215 100644
--- a/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/hyracks-algebricks/hyracks-algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -16,7 +16,6 @@
import edu.uci.ics.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.InferTypesRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
@@ -40,7 +39,10 @@
public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
normalization.add(new EliminateSubplanRule());
- normalization.add(new IntroduceGroupByForStandaloneAggregRule());
+ // TODO: This rule is incorrect and has been removed. Its replacement in
+ // Asterix (PushAggFuncIntoStandaloneAggregateRule)
+ // is language-specific.
+ // normalization.add(new IntroduceGroupByForStandaloneAggregRule());
normalization.add(new BreakSelectIntoConjunctsRule());
normalization.add(new PushSelectIntoJoinRule());
normalization.add(new ExtractGbyExpressionsRule());
@@ -100,7 +102,6 @@
return physicalPlanRewrites;
}
-
public final static List<IAlgebraicRewriteRule> prepareForJobGenRuleCollection() {
List<IAlgebraicRewriteRule> prepareForJobGenRewrites = new LinkedList<IAlgebraicRewriteRule>();
prepareForJobGenRewrites.add(new IsolateHyracksOperatorsRule(
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
new file mode 100644
index 0000000..08271c1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -0,0 +1,144 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ /**
+ * Replace the original aggregate functions with their corresponding global aggregate function.
+ */
+ public void replaceOriginalAggFuncs(Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap) {
+ for (Map.Entry<AggregateFunctionCallExpression, SimilarAggregatesInfo> entry : toReplaceMap.entrySet()) {
+ SimilarAggregatesInfo sai = entry.getValue();
+ for (AggregateExprInfo aei : sai.simAggs) {
+ AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
+ afce.setFunctionInfo(aei.newFunInfo);
+ afce.getArguments().clear();
+ afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult));
+ }
+ }
+ }
+
+ protected Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg,
+ GroupByOperator newGbyOp, Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap,
+ IOptimizationContext context) throws AlgebricksException {
+
+ ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
+ ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
+
+ List<LogicalVariable> initVars = initAgg.getVariables();
+ List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
+ int numExprs = initVars.size();
+
+ // First make sure that all agg funcs are two step, otherwise we cannot use local aggs.
+ for (int i = 0; i < numExprs; i++) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue();
+ if (!aggFun.isTwoStep()) {
+ return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null);
+ }
+ }
+
+ boolean haveAggToReplace = false;
+ for (int i = 0; i < numExprs; i++) {
+ Mutable<ILogicalExpression> expRef = initExprs.get(i);
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
+ IFunctionInfo fi1 = aggFun.getStepOneAggregate();
+ // Clone the aggregate's args.
+ List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun
+ .getArguments().size());
+ for (Mutable<ILogicalExpression> er : aggFun.getArguments()) {
+ newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
+ }
+ IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
+ SimilarAggregatesInfo inf = toReplaceMap.get(aggFun);
+ if (inf == null) {
+ inf = new SimilarAggregatesInfo();
+ LogicalVariable newAggVar = context.newVar();
+ pushedVars.add(newAggVar);
+ inf.stepOneResult = new VariableReferenceExpression(newAggVar);
+ inf.simAggs = new ArrayList<AggregateExprInfo>();
+ toReplaceMap.put(aggFun, inf);
+ AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
+ pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
+ }
+ AggregateExprInfo aei = new AggregateExprInfo();
+ aei.aggExprRef = expRef;
+ aei.newFunInfo = fi2;
+ inf.simAggs.add(aei);
+ haveAggToReplace = true;
+ }
+
+ if (!pushedVars.isEmpty()) {
+ AggregateOperator pushedAgg = new AggregateOperator(pushedVars, pushedExprs);
+ pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
+ // If newGbyOp is null, then we optimizing an aggregate without group by.
+ if (newGbyOp != null) {
+ // Hook up the nested aggregate op with the outer group by.
+ NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(
+ newGbyOp));
+ nts.setExecutionMode(ExecutionMode.LOCAL);
+ pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+ } else {
+ // The local aggregate operator is fed by the input of the original aggregate operator.
+ pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
+ // Set the partitioning variable in the local agg to ensure it is not projected away.
+ context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
+ LogicalVariable trueVar = context.newVar();
+ // Reintroduce assign op for the global agg partitioning var.
+ AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
+ ConstantExpression.TRUE));
+ trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
+ context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
+ initAgg.setPartitioningVariable(trueVar);
+ initAgg.getInputs().get(0).setValue(trueAssignOp);
+ }
+ return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
+ } else {
+ return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace, null);
+ }
+ }
+
+ protected class SimilarAggregatesInfo {
+ ILogicalExpression stepOneResult;
+ List<AggregateExprInfo> simAggs;
+ }
+
+ protected class AggregateExprInfo {
+ Mutable<ILogicalExpression> aggExprRef;
+ IFunctionInfo newFunInfo;
+ }
+
+ protected class BookkeepingInfo {
+ Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
+ Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>();
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
new file mode 100644
index 0000000..c3d935c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+
+public class IntroduceAggregateCombinerRule extends AbstractIntroduceCombinerRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ context.addToDontApplySet(this, op);
+ if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ AggregateOperator aggOp = (AggregateOperator) op;
+ if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+ return false;
+ }
+ Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
+ Pair<Boolean, Mutable<ILogicalOperator>> result = tryToPushAgg(aggOp, null, toReplaceMap, context);
+ if (!result.first || result.second == null) {
+ return false;
+ }
+ replaceOriginalAggFuncs(toReplaceMap);
+ context.computeAndSetTypeEnvironmentForOperator(aggOp);
+ return true;
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java
deleted file mode 100644
index 7f987e4..0000000
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceCombinerRule.java
+++ /dev/null
@@ -1,323 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroduceCombinerRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (context.checkIfInDontApplySet(this, op)) {
- return false;
- }
- context.addToDontApplySet(this, op);
- if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
- return false;
- }
- GroupByOperator gbyOp = (GroupByOperator) op;
- if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
- return false;
- }
-
- Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
- BookkeepingInfo bi = new BookkeepingInfo();
- bi.toReplaceMap = toReplaceMap;
- bi.modifGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>();
-
- GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
- if (newGbyOp == null) {
- return false;
- }
-
- for (Map.Entry<AggregateFunctionCallExpression, SimilarAggregatesInfo> entry : toReplaceMap.entrySet()) {
- SimilarAggregatesInfo sai = entry.getValue();
- for (AggregateExprInfo aei : sai.simAggs) {
- AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
- afce.setFunctionInfo(aei.newFunInfo);
- afce.getArguments().clear();
- afce.getArguments().add(new MutableObject<ILogicalExpression>(sai.stepOneResult));
- }
- }
-
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
- LogicalVariable newDecorVar = context.newVar();
- newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
- p.second.setValue(new VariableReferenceExpression(newDecorVar));
- }
- newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
- Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
- newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
-
- Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
- newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
-
- List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
- VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
-
- Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
- OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
-
- for (LogicalVariable var : freeVars) {
- if (!propagatedVars.contains(var)) {
- LogicalVariable newDecorVar = context.newVar();
- newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
- VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
- newDecorVar, context);
- }
- }
-
- Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
- opRef3.setValue(newGbyOp);
- typeGby(newGbyOp, context);
- typeGby(gbyOp, context);
- return true;
- }
-
- private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
- for (ILogicalPlan p : op.getNestedPlans()) {
- OperatorPropertiesUtil.typePlan(p, context);
- }
- context.computeAndSetTypeEnvironmentForOperator(op);
- }
-
- private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
- throws AlgebricksException {
-
- Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
- ILogicalOperator op3 = opRef3.getValue();
- GroupByOperator newGbyOp = new GroupByOperator();
- newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
- // copy annotations
- Map<String, Object> annotations = newGbyOp.getAnnotations();
- for (Entry<String, Object> a : gbyOp.getAnnotations().entrySet())
- annotations.put(a.getKey(), a.getValue());
-
- List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
-
- for (ILogicalPlan p : gbyOp.getNestedPlans()) {
- Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
- if (!bip.first) {
- // for now, if we cannot push everything, give up
- return null;
- }
- ILogicalPlan pushedSubplan = bip.second;
- if (pushedSubplan != null) {
- newGbyOp.getNestedPlans().add(pushedSubplan);
- }
- }
-
- ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
- ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
- // find maximal sequence of variable
- for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifGbyMap.entrySet()) {
- List<LogicalVariable> varList = e.getValue();
- boolean see1 = true;
- int sz1 = newOpGbyList.size();
- int i = 0;
- for (LogicalVariable v : varList) {
- if (see1) {
- if (i < sz1) {
- LogicalVariable v2 = newOpGbyList.get(i);
- if (v != v2) {
- // cannot linearize
- return null;
- }
- } else {
- see1 = false;
- newOpGbyList.add(v);
- replGbyList.add(context.newVar());
- }
- i++;
- } else {
- newOpGbyList.add(v);
- replGbyList.add(context.newVar());
- }
- }
- }
- // set the vars in the new op
- int n = newOpGbyList.size();
- for (int i = 0; i < n; i++) {
- newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
- VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
- }
- return newGbyOp;
- }
-
- private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan p, GroupByOperator oldGbyOp,
- GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context) {
- List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
- List<Mutable<ILogicalOperator>> toPushR = new ArrayList<Mutable<ILogicalOperator>>();
- for (Mutable<ILogicalOperator> r : p.getRoots()) {
- if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, toPushR)) {
- // for now, if we cannot push everything, give up
- return new Pair<Boolean, ILogicalPlan>(false, null);
- }
- }
- for (Mutable<ILogicalOperator> root : toPushR) {
- pushedRoots.add(root);
- }
- if (pushedRoots.isEmpty()) {
- return new Pair<Boolean, ILogicalPlan>(true, null);
- } else {
- return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
- }
- }
-
- private boolean tryToPushRoot(Mutable<ILogicalOperator> r, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
- BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
- List<Mutable<ILogicalOperator>> toPushAccumulate) {
- AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue();
- if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- return false;
- }
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
- AggregateOperator initAgg = (AggregateOperator) op1;
- Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
- if (!pOpRef.first) {
- return false;
- }
- Mutable<ILogicalOperator> opRef = pOpRef.second;
- if (opRef != null) {
- toPushAccumulate.add(opRef);
- }
- bi.modifGbyMap.put(oldGbyOp, gbyVars);
- return true;
- } else {
- while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
- op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
- }
- if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
- return false;
- }
- GroupByOperator nestedGby = (GroupByOperator) op2;
- List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
- List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
- concatGbyVars.addAll(gbyVars2);
- for (ILogicalPlan p : nestedGby.getNestedPlans()) {
- for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
- if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
- return false;
- }
- }
- }
- return true;
- }
- }
-
- private Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg, GroupByOperator newGbyOp,
- Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap, IOptimizationContext context) {
-
- ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
- ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
-
- List<LogicalVariable> initVars = initAgg.getVariables();
- List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
- int sz = initVars.size();
- for (int i = 0; i < sz; i++) {
- AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) initExprs.get(i).getValue();
- if (!aggFun.isTwoStep()) {
- return new Pair<Boolean, Mutable<ILogicalOperator>>(false, null);
- }
- }
-
- boolean haveAggToReplace = false;
- for (int i = 0; i < sz; i++) {
- Mutable<ILogicalExpression> expRef = initExprs.get(i);
- AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
- IFunctionInfo fi1 = aggFun.getStepOneAggregate();
- List<Mutable<ILogicalExpression>> newArgs = new ArrayList<Mutable<ILogicalExpression>>(aggFun
- .getArguments().size());
- for (Mutable<ILogicalExpression> er : aggFun.getArguments()) {
- newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
- }
- // AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
- // pushedExprs.add(new Mutable<ILogicalExpression>(aggLocal));
-
- IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
-
- SimilarAggregatesInfo inf = toReplaceMap.get(aggFun);
- if (inf == null) {
- inf = new SimilarAggregatesInfo();
- LogicalVariable newAggVar = context.newVar();
- pushedVars.add(newAggVar);
- inf.stepOneResult = new VariableReferenceExpression(newAggVar);
- inf.simAggs = new ArrayList<AggregateExprInfo>();
- toReplaceMap.put(aggFun, inf);
- AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
- pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
- }
- AggregateExprInfo aei = new AggregateExprInfo();
- aei.aggExprRef = expRef;
- aei.newFunInfo = fi2;
- inf.simAggs.add(aei);
- haveAggToReplace = true;
- }
-
- if (!pushedVars.isEmpty()) {
- AggregateOperator pushedAgg = new AggregateOperator(pushedVars, pushedExprs);
- pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
- NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(newGbyOp));
- nts.setExecutionMode(ExecutionMode.LOCAL);
- pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(nts));
- return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
- } else {
- return new Pair<Boolean, Mutable<ILogicalOperator>>(haveAggToReplace, null);
- }
- }
-
- private class SimilarAggregatesInfo {
- ILogicalExpression stepOneResult;
- List<AggregateExprInfo> simAggs;
- }
-
- private class AggregateExprInfo {
- Mutable<ILogicalExpression> aggExprRef;
- IFunctionInfo newFunInfo;
- }
-
- private class BookkeepingInfo {
- Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap;
- Map<GroupByOperator, List<LogicalVariable>> modifGbyMap;
- }
-}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
new file mode 100644
index 0000000..5c5fdb1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -0,0 +1,218 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public class IntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ context.addToDontApplySet(this, op);
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ GroupByOperator gbyOp = (GroupByOperator) op;
+ if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
+ return false;
+ }
+
+ BookkeepingInfo bi = new BookkeepingInfo();
+ GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
+ if (newGbyOp == null) {
+ return false;
+ }
+
+ replaceOriginalAggFuncs(bi.toReplaceMap);
+
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
+ LogicalVariable newDecorVar = context.newVar();
+ newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
+ p.second.setValue(new VariableReferenceExpression(newDecorVar));
+ }
+ newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
+ Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
+ newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
+
+ Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
+ newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
+
+ List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
+
+ Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+ OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
+
+ for (LogicalVariable var : freeVars) {
+ if (!propagatedVars.contains(var)) {
+ LogicalVariable newDecorVar = context.newVar();
+ newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
+ VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
+ newDecorVar, context);
+ }
+ }
+
+ Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+ opRef3.setValue(newGbyOp);
+ typeGby(newGbyOp, context);
+ typeGby(gbyOp, context);
+ return true;
+ }
+
+ private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ OperatorPropertiesUtil.typePlan(p, context);
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ }
+
+ private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
+ throws AlgebricksException {
+ // Hook up input to new group-by.
+ Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+ ILogicalOperator op3 = opRef3.getValue();
+ GroupByOperator newGbyOp = new GroupByOperator();
+ newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+ // Copy annotations.
+ Map<String, Object> annotations = newGbyOp.getAnnotations();
+ annotations.putAll(gbyOp.getAnnotations());
+
+ List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+ for (ILogicalPlan p : gbyOp.getNestedPlans()) {
+ Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
+ if (!bip.first) {
+ // For now, if we cannot push everything, give up.
+ return null;
+ }
+ ILogicalPlan pushedSubplan = bip.second;
+ if (pushedSubplan != null) {
+ newGbyOp.getNestedPlans().add(pushedSubplan);
+ }
+ }
+
+ ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
+ // Find maximal sequence of variable.
+ for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
+ List<LogicalVariable> varList = e.getValue();
+ boolean see1 = true;
+ int sz1 = newOpGbyList.size();
+ int i = 0;
+ for (LogicalVariable v : varList) {
+ if (see1) {
+ if (i < sz1) {
+ LogicalVariable v2 = newOpGbyList.get(i);
+ if (v != v2) {
+ // cannot linearize
+ return null;
+ }
+ } else {
+ see1 = false;
+ newOpGbyList.add(v);
+ replGbyList.add(context.newVar());
+ }
+ i++;
+ } else {
+ newOpGbyList.add(v);
+ replGbyList.add(context.newVar());
+ }
+ }
+ }
+ // set the vars in the new op
+ int n = newOpGbyList.size();
+ for (int i = 0; i < n; i++) {
+ newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
+ VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
+ }
+ return newGbyOp;
+ }
+
+ private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
+ GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
+ throws AlgebricksException {
+ List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+ for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
+ if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
+ // For now, if we cannot push everything, give up.
+ return new Pair<Boolean, ILogicalPlan>(false, null);
+ }
+ }
+ if (pushedRoots.isEmpty()) {
+ return new Pair<Boolean, ILogicalPlan>(true, null);
+ } else {
+ return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
+ }
+ }
+
+ private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
+ BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
+ List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ AggregateOperator initAgg = (AggregateOperator) op1;
+ Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
+ if (!pOpRef.first) {
+ return false;
+ }
+ Mutable<ILogicalOperator> opRef = pOpRef.second;
+ if (opRef != null) {
+ toPushAccumulate.add(opRef);
+ }
+ bi.modifyGbyMap.put(oldGbyOp, gbyVars);
+ return true;
+ } else {
+ while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
+ op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+ }
+ if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ GroupByOperator nestedGby = (GroupByOperator) op2;
+ List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+ List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
+ concatGbyVars.addAll(gbyVars2);
+ for (ILogicalPlan p : nestedGby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
+ if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java
deleted file mode 100644
index 5272347..0000000
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByForStandaloneAggregRule.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * When aggregates appear w/o group-by, a default group by a constant is
- * introduced.
- */
-
-public class IntroduceGroupByForStandaloneAggregRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
- return false;
- }
- Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
- if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- return false;
- }
-
- AssignOperator assign = (AssignOperator) op;
- AggregateOperator agg = (AggregateOperator) op2;
- if (agg.getVariables().size() != 1) {
- return false;
- }
- LogicalVariable aggVar = agg.getVariables().get(0);
- List<LogicalVariable> used = new LinkedList<LogicalVariable>();
- VariableUtilities.getUsedVariables(assign, used);
- if (used.contains(aggVar)) {
- Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- LogicalVariable gbyVar = context.newVar();
- // ILogicalExpression constOne = new ConstantExpression(new
- // IntegerLiteral(new Integer(1)));
- groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(gbyVar,
- new MutableObject<ILogicalExpression>(ConstantExpression.TRUE)));
- NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>());
- List<Mutable<ILogicalOperator>> aggInpList = agg.getInputs();
- aggInpList.clear();
- aggInpList.add(new MutableObject<ILogicalOperator>(nts));
- ILogicalPlan np1 = new ALogicalPlanImpl(opRef2);
- ArrayList<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
- nestedPlans.add(np1);
- GroupByOperator gbyOp = new GroupByOperator(groupByList,
- new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(), nestedPlans);
- Mutable<ILogicalOperator> opRefGby = new MutableObject<ILogicalOperator>(gbyOp);
- nts.getDataSourceReference().setValue(gbyOp);
- gbyOp.getInputs().add(opRef3);
- List<Mutable<ILogicalOperator>> asgnInpList = assign.getInputs();
- context.computeAndSetTypeEnvironmentForOperator(nts);
- context.computeAndSetTypeEnvironmentForOperator(agg);
- context.computeAndSetTypeEnvironmentForOperator(gbyOp);
- asgnInpList.clear();
- asgnInpList.add(opRefGby);
- return true;
- }
- return false;
- }
-
-}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/pom.xml b/hyracks-algebricks/hyracks-algebricks-tests/pom.xml
index e700b0b..9afb096 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/pom.xml
+++ b/hyracks-algebricks/hyracks-algebricks-tests/pom.xml
@@ -20,19 +20,6 @@
</configuration>
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7.2</version>
- <configuration>
- <forkMode>pertest</forkMode>
- <argLine>-enableassertions -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
- <includes>
- <include>**/*Test.java</include>
- <include>**/*Suite.java</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
diff --git a/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 93c0615..afe91d1 100644
--- a/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -81,6 +81,7 @@
<configuration>
<hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
<hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+ <jvmOptions>${jvm.extraargs}</jvmOptions>
</configuration>
<executions>
<execution>
diff --git a/hyracks-examples/text-example/textapp/pom.xml b/hyracks-examples/text-example/textapp/pom.xml
index 4f6fb3c..50b492b 100644
--- a/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks-examples/text-example/textapp/pom.xml
@@ -81,6 +81,7 @@
<configuration>
<hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
<hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+ <jvmOptions>${jvm.extraargs}</jvmOptions>
</configuration>
<executions>
<execution>
diff --git a/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/AbstractHyracksMojo.java b/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/AbstractHyracksMojo.java
index 972f584..f35c78a 100644
--- a/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/AbstractHyracksMojo.java
+++ b/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/AbstractHyracksMojo.java
@@ -24,6 +24,11 @@
import org.apache.maven.plugin.MojoExecutionException;
public abstract class AbstractHyracksMojo extends AbstractMojo {
+ /**
+ * @parameter
+ */
+ protected String jvmOptions;
+
protected Process launch(File command, String options, File workingDir) throws MojoExecutionException {
if (!command.isFile()) {
throw new MojoExecutionException(command.getAbsolutePath() + " is not an executable program");
@@ -61,8 +66,12 @@
for (int i = 0; i < optionsArray.length; ++i) {
commandWithOptions[i + 1] = optionsArray[i];
}
- Process proc = Runtime.getRuntime().exec(commandWithOptions, null,
- workingDir == null ? new File(".") : workingDir);
+ ProcessBuilder pb = new ProcessBuilder(commandWithOptions);
+ if (jvmOptions != null) {
+ pb.environment().put("JAVA_OPTS", jvmOptions);
+ }
+ pb.directory(workingDir == null ? new File(".") : workingDir);
+ Process proc = pb.start();
dump(proc.getInputStream());
dump(proc.getErrorStream());
return proc;
diff --git a/pom.xml b/pom.xml
index fbef96f..933c320 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,110 +1,135 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.1-SNAPSHOT</version>
- <packaging>pom</packaging>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ <packaging>pom</packaging>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-release-plugin</artifactId>
- <version>2.0</version>
- <configuration>
- <goals>package source:jar javadoc:jar deploy:deploy</goals>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>versions-maven-plugin</artifactId>
- <version>1.2</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkMode>pertest</forkMode>
- <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n -Xms1024m -Xmx1024m</argLine>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <properties>
+ <jvm.extraargs></jvm.extraargs>
+ </properties>
- <scm>
- <connection>scm:svn:https://hyracks.googlecode.com/svn/trunk/hyracks</connection>
- <developerConnection>scm:svn:https://hyracks.googlecode.com/svn/trunk/hyracks</developerConnection>
- <url>http://code.google.com/p/hyracks/source/browse/#svn/trunk/hyracks</url>
- </scm>
+ <profiles>
+ <profile>
+ <id>macosx</id>
+ <activation>
+ <os>
+ <name>mac os x</name>
+ </os>
+ <jdk>1.7</jdk>
+ </activation>
+ <properties>
+ <jvm.extraargs>-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.KQueueSelectorProvider
+ -Xms1024m -Xmx1024m</jvm.extraargs>
+ </properties>
+ </profile>
+ </profiles>
- <distributionManagement>
- <repository>
- <id>hyracks-releases</id>
- <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-releases/</url>
- </repository>
- <snapshotRepository>
- <id>hyracks-snapshots</id>
- <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-snapshots/</url>
- </snapshotRepository>
- </distributionManagement>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ <version>2.0</version>
+ <configuration>
+ <goals>package source:jar javadoc:jar deploy:deploy</goals>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>1.2</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions
+ -Djava.util.logging.config.file=${user.home}/logging.properties
+ -Xdebug
+ -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n
+ ${jvm.extraargs}</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-changelog-plugin</artifactId>
- </plugin>
- </plugins>
- </reporting>
+ <scm>
+ <connection>scm:svn:https://hyracks.googlecode.com/svn/trunk/hyracks</connection>
+ <developerConnection>scm:svn:https://hyracks.googlecode.com/svn/trunk/hyracks</developerConnection>
+ <url>http://code.google.com/p/hyracks/source/browse/#svn/trunk/hyracks</url>
+ </scm>
- <repositories>
- <repository>
- <id>hyracks-public</id>
- <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
- </repository>
- <repository>
- <id>jboss-public</id>
- <url>https://repository.jboss.org/nexus/content/groups/public/</url>
- </repository>
- </repositories>
+ <distributionManagement>
+ <repository>
+ <id>hyracks-releases</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-releases/</url>
+ </repository>
+ <snapshotRepository>
+ <id>hyracks-snapshots</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-snapshots/</url>
+ </snapshotRepository>
+ </distributionManagement>
- <pluginRepositories>
- <pluginRepository>
- <id>hyracks-public</id>
- <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
- <releases>
- <updatePolicy>always</updatePolicy>
- </releases>
- </pluginRepository>
- </pluginRepositories>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-changelog-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </reporting>
- <modules>
- <module>hyracks-ipc</module>
- <module>hyracks-api</module>
- <module>hyracks-dataflow-common</module>
- <module>hyracks-dataflow-std</module>
- <module>hyracks-dataflow-hadoop</module>
- <module>hyracks-control</module>
- <module>hyracks-net</module>
- <module>hyracks-data</module>
- <module>hyracks-cli</module>
- <module>hyracks-storage-common</module>
- <module>hyracks-storage-am-common</module>
- <module>hyracks-storage-am-btree</module>
- <module>hyracks-storage-am-invertedindex</module>
- <module>hyracks-storage-am-lsm-common</module>
- <module>hyracks-storage-am-lsm-btree</module>
- <module>hyracks-storage-am-lsm-rtree</module>
- <module>hyracks-storage-am-rtree</module>
- <module>hyracks-test-support</module>
- <module>hyracks-tests</module>
- <module>hyracks-server</module>
- <module>hyracks-examples</module>
- <module>hyracks-documentation</module>
- <module>hyracks-hadoop-compat</module>
- <module>hyracks-algebricks</module>
- <!--module>hyracks-yarn</module-->
- <module>hyracks-maven-plugins</module>
- </modules>
+ <repositories>
+ <repository>
+ <id>hyracks-public</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+ </repository>
+ <repository>
+ <id>jboss-public</id>
+ <url>https://repository.jboss.org/nexus/content/groups/public/</url>
+ </repository>
+ </repositories>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>hyracks-public</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public/</url>
+ <releases>
+ <updatePolicy>always</updatePolicy>
+ </releases>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <modules>
+ <module>hyracks-ipc</module>
+ <module>hyracks-api</module>
+ <module>hyracks-dataflow-common</module>
+ <module>hyracks-dataflow-std</module>
+ <module>hyracks-dataflow-hadoop</module>
+ <module>hyracks-control</module>
+ <module>hyracks-net</module>
+ <module>hyracks-data</module>
+ <module>hyracks-cli</module>
+ <module>hyracks-storage-common</module>
+ <module>hyracks-storage-am-common</module>
+ <module>hyracks-storage-am-btree</module>
+ <module>hyracks-storage-am-invertedindex</module>
+ <module>hyracks-storage-am-lsm-common</module>
+ <module>hyracks-storage-am-lsm-btree</module>
+ <module>hyracks-storage-am-lsm-rtree</module>
+ <module>hyracks-storage-am-rtree</module>
+ <module>hyracks-test-support</module>
+ <module>hyracks-tests</module>
+ <module>hyracks-server</module>
+ <module>hyracks-examples</module>
+ <module>hyracks-documentation</module>
+ <module>hyracks-hadoop-compat</module>
+ <module>hyracks-algebricks</module>
+ <!--module>hyracks-yarn</module -->
+ <module>hyracks-maven-plugins</module>
+ </modules>
</project>