1. fix asterixdb issue 782
--- push nested pipeline before a nested group-by operator into the combiner group-by operator in the AbstractIntroduceGroupByCombinerRule
--- add a processNullTest abstract method in the AbstractIntroduceGroupByCombinerRule
-- fix the join order in a subplan
2. allow user-configurable buffer cache page size (B-tree page size) in Pregelix
commit 4d9a11d0c05281a41bbabe03066478fe851b3a2b
Author: buyingyi <buyingyi@gmail.com>
Change-Id: Ib7761370df8606c55ac34c126554319586e824f0
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/64
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
new file mode 100644
index 0000000..aba68f9
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public abstract class AbstractIntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ GroupByOperator gbyOp = (GroupByOperator) op;
+ if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
+ return false;
+ }
+
+ BookkeepingInfo bi = new BookkeepingInfo();
+ GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
+ if (newGbyOp == null) {
+ return false;
+ }
+
+ replaceOriginalAggFuncs(bi.toReplaceMap);
+
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
+ LogicalVariable newDecorVar = context.newVar();
+ newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
+ p.second.setValue(new VariableReferenceExpression(newDecorVar));
+ }
+ newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
+ Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
+ newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
+
+ Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
+ newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
+
+ List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
+
+ Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+ OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
+
+ for (LogicalVariable var : freeVars) {
+ if (!propagatedVars.contains(var)) {
+ LogicalVariable newDecorVar = context.newVar();
+ newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
+ VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
+ newDecorVar, context);
+ }
+ }
+
+ Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+ opRef3.setValue(newGbyOp);
+ typeGby(newGbyOp, context);
+ typeGby(gbyOp, context);
+ context.addToDontApplySet(this, op);
+ return true;
+ }
+
+ private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ OperatorPropertiesUtil.typePlan(p, context);
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ }
+
+ private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
+ throws AlgebricksException {
+ // Hook up input to new group-by.
+ Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+ ILogicalOperator op3 = opRef3.getValue();
+ GroupByOperator newGbyOp = new GroupByOperator();
+ newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+ // Copy annotations.
+ Map<String, Object> annotations = newGbyOp.getAnnotations();
+ annotations.putAll(gbyOp.getAnnotations());
+
+ List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+ for (ILogicalPlan p : gbyOp.getNestedPlans()) {
+ Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
+ if (!bip.first) {
+ // For now, if we cannot push everything, give up.
+ return null;
+ }
+ ILogicalPlan pushedSubplan = bip.second;
+ if (pushedSubplan != null) {
+ newGbyOp.getNestedPlans().add(pushedSubplan);
+ }
+ }
+
+ ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
+ // Find maximal sequence of variable.
+ for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
+ List<LogicalVariable> varList = e.getValue();
+ boolean see1 = true;
+ int sz1 = newOpGbyList.size();
+ int i = 0;
+ for (LogicalVariable v : varList) {
+ if (see1) {
+ if (i < sz1) {
+ LogicalVariable v2 = newOpGbyList.get(i);
+ if (v != v2) {
+ // cannot linearize
+ return null;
+ }
+ } else {
+ see1 = false;
+ newOpGbyList.add(v);
+ replGbyList.add(context.newVar());
+ }
+ i++;
+ } else {
+ newOpGbyList.add(v);
+ replGbyList.add(context.newVar());
+ }
+ }
+ }
+ // set the vars in the new op
+ int n = newOpGbyList.size();
+ for (int i = 0; i < n; i++) {
+ newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
+ VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
+ }
+ return newGbyOp;
+ }
+
+ private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
+ GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
+ throws AlgebricksException {
+ List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+ for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
+ if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
+ // For now, if we cannot push everything, give up.
+ return new Pair<Boolean, ILogicalPlan>(false, null);
+ }
+ }
+ if (pushedRoots.isEmpty()) {
+ return new Pair<Boolean, ILogicalPlan>(true, null);
+ } else {
+ return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
+ }
+ }
+
+ private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
+ BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
+ List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ AggregateOperator initAgg = (AggregateOperator) op1;
+ Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
+ if (!pOpRef.first) {
+ return false;
+ }
+ Mutable<ILogicalOperator> opRef = pOpRef.second;
+ if (opRef != null) {
+ toPushAccumulate.add(opRef);
+ }
+ bi.modifyGbyMap.put(oldGbyOp, gbyVars);
+ return true;
+ } else {
+ while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
+ op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+ }
+ if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ GroupByOperator nestedGby = (GroupByOperator) op2;
+ List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+ List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
+ concatGbyVars.addAll(gbyVars2);
+ for (ILogicalPlan p : nestedGby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
+ if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
+ return false;
+ }
+ }
+ }
+
+ /***
+ * Push the nested pipeline which provides the input to the nested group operator into newGbyOp (the combined gby op).
+ * The change is to fix asterixdb issue 782.
+ */
+ Mutable<ILogicalOperator> nestedGbyInputRef = nestedGby.getInputs().get(0);
+ Mutable<ILogicalOperator> startOfPipelineRef = nestedGbyInputRef;
+ if (startOfPipelineRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ return true;
+ }
+
+ // move down the nested pipeline to find the start of the pipeline right upon the nested-tuple-source
+ boolean hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
+ .getValue());
+ while (startOfPipelineRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ startOfPipelineRef = startOfPipelineRef.getValue().getInputs().get(0);
+ hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
+ .getValue());
+ }
+ //keep the old nested-tuple-source
+ Mutable<ILogicalOperator> oldNts = startOfPipelineRef.getValue().getInputs().get(0);
+
+ //move down the nested op in the new gby operator
+ Mutable<ILogicalOperator> newGbyNestedOpRef = toPushAccumulate.get(0);
+ while (newGbyNestedOpRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ newGbyNestedOpRef = newGbyNestedOpRef.getValue().getInputs().get(0);
+ }
+
+ //insert the pipeline before nested gby into the new (combiner) gby's nested plan on top of the nested-tuple-source
+ startOfPipelineRef.getValue().getInputs().set(0, newGbyNestedOpRef.getValue().getInputs().get(0));
+ newGbyNestedOpRef.getValue().getInputs().set(0, nestedGbyInputRef);
+
+ //in the old gby operator, remove the nested pipeline since it is already pushed to the combiner gby
+ nestedGby.getInputs().set(0, oldNts);
+ List<LogicalVariable> aggProducedVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(toPushAccumulate.get(0).getValue(), aggProducedVars);
+
+ if (hasIsNullFunction && aggProducedVars.size() != 0) {
+ // if the old nested pipeline contains a not-null-check, we need to convert it to a not-system-null-check in the non-local gby
+ processNullTest(context, nestedGby, aggProducedVars);
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Deal with the case where the nested plan in the combiner gby operator has a null-test before invoking aggregation functions.
+ *
+ * @param context
+ * The optimization context.
+ * @param nestedGby
+ * The nested gby operator in the global gby operator's subplan.
+ * @param firstAggVar
+ * The first aggregation variable produced by the combiner gby.
+ */
+ protected abstract void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
+ List<LogicalVariable> aggregateVarsProducedByCombiner);
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
index 3cd1100..73fd966 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -76,6 +77,7 @@
}
ntsToEtsInSubplan(subplan, context);
+ cleanupJoins(subplan);
InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
join.getInputs().add(opRef3);
opRef2.setValue(OperatorManipulationUtil.eliminateSingleSubplanOverEts(subplan));
@@ -90,6 +92,30 @@
return false;
}
+ private static void cleanupJoins(SubplanOperator s) {
+ for (ILogicalPlan p : s.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ cleanupJoins(r);
+ }
+ }
+ }
+
+ /** clean up joins that have one input branch that is empty tuple source */
+ private static void cleanupJoins(Mutable<ILogicalOperator> opRef) {
+ if (opRef.getValue() instanceof AbstractBinaryJoinOperator) {
+ for (Mutable<ILogicalOperator> inputRef : opRef.getValue().getInputs()) {
+ if (inputRef.getValue().getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+ opRef.getValue().getInputs().remove(inputRef);
+ opRef.setValue(opRef.getValue().getInputs().get(0).getValue());
+ break;
+ }
+ }
+ }
+ for (Mutable<ILogicalOperator> inputRef : opRef.getValue().getInputs()) {
+ cleanupJoins(inputRef);
+ }
+ }
+
private static void ntsToEtsInSubplan(SubplanOperator s, IOptimizationContext context) throws AlgebricksException {
for (ILogicalPlan p : s.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 2a78a98..aa418ea 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -14,219 +14,18 @@
*/
package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-public class IntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+public class IntroduceGroupByCombinerRule extends AbstractIntroduceGroupByCombinerRule {
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (context.checkIfInDontApplySet(this, op)) {
- return false;
- }
- if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
- return false;
- }
- GroupByOperator gbyOp = (GroupByOperator) op;
- if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
- return false;
- }
-
- BookkeepingInfo bi = new BookkeepingInfo();
- GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
- if (newGbyOp == null) {
- return false;
- }
-
- replaceOriginalAggFuncs(bi.toReplaceMap);
-
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
- LogicalVariable newDecorVar = context.newVar();
- newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
- p.second.setValue(new VariableReferenceExpression(newDecorVar));
- }
- newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
- Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
- newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
-
- Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
- newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
-
- List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
- VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
-
- Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
- OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
-
- for (LogicalVariable var : freeVars) {
- if (!propagatedVars.contains(var)) {
- LogicalVariable newDecorVar = context.newVar();
- newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
- VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
- newDecorVar, context);
- }
- }
-
- Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
- opRef3.setValue(newGbyOp);
- typeGby(newGbyOp, context);
- typeGby(gbyOp, context);
- context.addToDontApplySet(this, op);
- return true;
+ protected void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
+ List<LogicalVariable> aggregateVarsProducedByCombiner) {
+ /** The default introduce group-by combiner rule ignores null test, however a language implementor can decide their own semantics. */
}
- private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
- for (ILogicalPlan p : op.getNestedPlans()) {
- OperatorPropertiesUtil.typePlan(p, context);
- }
- context.computeAndSetTypeEnvironmentForOperator(op);
- }
-
- private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
- throws AlgebricksException {
- // Hook up input to new group-by.
- Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
- ILogicalOperator op3 = opRef3.getValue();
- GroupByOperator newGbyOp = new GroupByOperator();
- newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
- // Copy annotations.
- Map<String, Object> annotations = newGbyOp.getAnnotations();
- annotations.putAll(gbyOp.getAnnotations());
-
- List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
- for (ILogicalPlan p : gbyOp.getNestedPlans()) {
- Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
- if (!bip.first) {
- // For now, if we cannot push everything, give up.
- return null;
- }
- ILogicalPlan pushedSubplan = bip.second;
- if (pushedSubplan != null) {
- newGbyOp.getNestedPlans().add(pushedSubplan);
- }
- }
-
- ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
- ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
- // Find maximal sequence of variable.
- for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
- List<LogicalVariable> varList = e.getValue();
- boolean see1 = true;
- int sz1 = newOpGbyList.size();
- int i = 0;
- for (LogicalVariable v : varList) {
- if (see1) {
- if (i < sz1) {
- LogicalVariable v2 = newOpGbyList.get(i);
- if (v != v2) {
- // cannot linearize
- return null;
- }
- } else {
- see1 = false;
- newOpGbyList.add(v);
- replGbyList.add(context.newVar());
- }
- i++;
- } else {
- newOpGbyList.add(v);
- replGbyList.add(context.newVar());
- }
- }
- }
- // set the vars in the new op
- int n = newOpGbyList.size();
- for (int i = 0; i < n; i++) {
- newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
- VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
- }
- return newGbyOp;
- }
-
- private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
- GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
- throws AlgebricksException {
- List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
- for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
- if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
- // For now, if we cannot push everything, give up.
- return new Pair<Boolean, ILogicalPlan>(false, null);
- }
- }
- if (pushedRoots.isEmpty()) {
- return new Pair<Boolean, ILogicalPlan>(true, null);
- } else {
- return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
- }
- }
-
- private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
- BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
- List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
- AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
- if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- return false;
- }
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
- AggregateOperator initAgg = (AggregateOperator) op1;
- Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
- if (!pOpRef.first) {
- return false;
- }
- Mutable<ILogicalOperator> opRef = pOpRef.second;
- if (opRef != null) {
- toPushAccumulate.add(opRef);
- }
- bi.modifyGbyMap.put(oldGbyOp, gbyVars);
- return true;
- } else {
- while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
- op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
- }
- if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
- return false;
- }
- GroupByOperator nestedGby = (GroupByOperator) op2;
- List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
- List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
- concatGbyVars.addAll(gbyVars2);
- for (ILogicalPlan p : nestedGby.getNestedPlans()) {
- for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
- if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
- return false;
- }
- }
- }
- return true;
- }
- }
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 7d43a68..e1c8463 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -63,19 +63,42 @@
return false;
}
- InnerJoinOperator product = new InnerJoinOperator(
- new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+ /**
+ * finding the boundary between left branch and right branch
+ * operator pipeline on-top-of boundaryOpRef (exclusive) is the inner branch
+ * operator pipeline under boundaryOpRef (inclusive) is the outer branch
+ */
+ Mutable<ILogicalOperator> currentOpRef = opRef;
+ Mutable<ILogicalOperator> boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+ while (currentOpRef.getValue().getInputs().size() == 1) {
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ Mutable<ILogicalOperator> tupleSourceOpRef = currentOpRef;
+ currentOpRef = opRef;
+ if (tupleSourceOpRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ while (currentOpRef.getValue().getInputs().size() == 1
+ && currentOpRef.getValue() instanceof AbstractScanOperator
+ && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
+ if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
+ /** move down the boundary if the operator is independent of the tuple source */
+ boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+ } else {
+ break;
+ }
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ }
+ }
- EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
- context.computeAndSetTypeEnvironmentForOperator(ets);
- Mutable<ILogicalOperator> emptySrc = new MutableObject<ILogicalOperator>(ets);
- List<Mutable<ILogicalOperator>> opInpList = op.getInputs();
- opInpList.clear();
- opInpList.add(emptySrc);
- product.getInputs().add(opRef2); // outer branch
- product.getInputs().add(new MutableObject<ILogicalOperator>(op));
- opRef.setValue(product); // plug the product in the plan
- context.computeAndSetTypeEnvironmentForOperator(product);
+ /** join the two independent branches */
+ InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE),
+ new MutableObject<ILogicalOperator>(boundaryOpRef.getValue()), new MutableObject<ILogicalOperator>(
+ opRef.getValue()));
+ opRef.setValue(join);
+ ILogicalOperator ets = new EmptyTupleSourceOperator();
+ boundaryOpRef.setValue(ets);
+ context.computeAndSetTypeEnvironmentForOperator(boundaryOpRef.getValue());
+ context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+ context.computeAndSetTypeEnvironmentForOperator(join);
return true;
}
@@ -98,6 +121,9 @@
}
private boolean opsAreIndependent(ILogicalOperator unnestOp, ILogicalOperator outer) throws AlgebricksException {
+ if (unnestOp.equals(outer)) {
+ return false;
+ }
List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
VariableUtilities.getUsedVariables(unnestOp, opUsedVars);
Set<LogicalVariable> op2LiveVars = new HashSet<LogicalVariable>();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 13a08b7..c584971 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.core.util;
import java.io.File;
+import java.util.Collections;
import java.util.EnumSet;
import org.apache.commons.io.FileUtils;
@@ -78,6 +79,7 @@
ncConfig1.nodeId = NC1_ID;
ncConfig1.ioDevices = "dev1,dev2";
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
+ ncConfig1.appArgs = Collections.singletonList("65536");
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -90,6 +92,7 @@
ncConfig2.nodeId = NC2_ID;
ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
ncConfig2.ioDevices = "dev3,dev4";
+ ncConfig2.appArgs = Collections.singletonList("65536");
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index a8307d7..0f10b4d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -66,8 +66,8 @@
}
};
- public RuntimeContext(INCApplicationContext appCtx) {
- int pageSize = 64 * 1024;
+ public RuntimeContext(INCApplicationContext appCtx, int vFrameSize) {
+ int pageSize = vFrameSize;
long memSize = Runtime.getRuntime().maxMemory();
long bufferSize = memSize / 4;
int numPages = (int) (bufferSize / pageSize);
diff --git a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
index 79f42ed..82a7bfe 100644
--- a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
+++ b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
@@ -16,7 +16,7 @@
CC_CLIENTPORT=3099
#The CC port for Hyracks cluster management
-CC_CLUSTERPORT=1099
+CC_CLUSTERPORT=4099
#The CC port for REST communication
CC_HTTPPORT=16001
@@ -42,6 +42,9 @@
#The frame size of the internal dataflow engine
FRAME_SIZE=65536
+#The frame size of the vertex storage, it should be larger than the largest vertex byte size
+VFRAME_SIZE=65536
+
#The number of jobs whose logs are kept in-memory on the CC
JOB_HISTORY_SIZE=0
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
index f9b6a4e..1cba489 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
@@ -96,7 +96,7 @@
cmd=( "${PREGELIX_HOME}/bin/pregelixnc" )
cmd+=( -cc-host $CCHOST -cc-port $CC_CLUSTERPORT
-cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR
- -node-id $NODEID -iodevices "${IO_DIRS}" -net-buffer-count 5 );
+ -node-id $NODEID -iodevices "${IO_DIRS}" -net-buffer-count 5 -- ${VFRAME_SIZE});
printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "${cmd[*]}" >> "$NCLOGS_DIR/$NODEID.log"
${cmd[@]} >> "$NCLOGS_DIR/$NODEID.log" 2>&1 &
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
index fe72d7a..8ca227a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
@@ -23,7 +23,11 @@
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
- rCtx = new RuntimeContext(ncAppCtx);
+ int vFrameSize = 65536;
+ if(args.length >0){
+ vFrameSize = Integer.parseInt(args[0]);
+ }
+ rCtx = new RuntimeContext(ncAppCtx, vFrameSize);
ncAppCtx.setApplicationObject(rCtx);
}