1. fix asterixdb issue 782
   --- push nested pipeline before a nested group-by operator into the combiner group-by operator in the AbstractIntroduceGroupByCombinerRule
   --- add a processNullTest abstract method in the AbstractIntroduceGroupByCombinerRule
   --  fix the join order in a subplan
2. allow user-configurable buffer cache page size (B-tree page size) in Pregelix

commit 4d9a11d0c05281a41bbabe03066478fe851b3a2b
Author: buyingyi <buyingyi@gmail.com>

Change-Id: Ib7761370df8606c55ac34c126554319586e824f0
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/64
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
new file mode 100644
index 0000000..aba68f9
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public abstract class AbstractIntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        GroupByOperator gbyOp = (GroupByOperator) op;
+        if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
+            return false;
+        }
+
+        BookkeepingInfo bi = new BookkeepingInfo();
+        GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
+        if (newGbyOp == null) {
+            return false;
+        }
+
+        replaceOriginalAggFuncs(bi.toReplaceMap);
+
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
+            LogicalVariable newDecorVar = context.newVar();
+            newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
+            p.second.setValue(new VariableReferenceExpression(newDecorVar));
+        }
+        newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
+        Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
+        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
+
+        Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
+        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
+
+        List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
+
+        Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+        OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
+
+        for (LogicalVariable var : freeVars) {
+            if (!propagatedVars.contains(var)) {
+                LogicalVariable newDecorVar = context.newVar();
+                newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
+                VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
+                        newDecorVar, context);
+            }
+        }
+
+        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+        opRef3.setValue(newGbyOp);
+        typeGby(newGbyOp, context);
+        typeGby(gbyOp, context);
+        context.addToDontApplySet(this, op);
+        return true;
+    }
+
+    private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            OperatorPropertiesUtil.typePlan(p, context);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+    }
+
+    private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
+            throws AlgebricksException {
+        // Hook up input to new group-by.
+        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
+        ILogicalOperator op3 = opRef3.getValue();
+        GroupByOperator newGbyOp = new GroupByOperator();
+        newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
+        // Copy annotations.
+        Map<String, Object> annotations = newGbyOp.getAnnotations();
+        annotations.putAll(gbyOp.getAnnotations());
+
+        List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+        for (ILogicalPlan p : gbyOp.getNestedPlans()) {
+            Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
+            if (!bip.first) {
+                // For now, if we cannot push everything, give up.
+                return null;
+            }
+            ILogicalPlan pushedSubplan = bip.second;
+            if (pushedSubplan != null) {
+                newGbyOp.getNestedPlans().add(pushedSubplan);
+            }
+        }
+
+        ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
+        // Find maximal sequence of variable.
+        for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
+            List<LogicalVariable> varList = e.getValue();
+            boolean see1 = true;
+            int sz1 = newOpGbyList.size();
+            int i = 0;
+            for (LogicalVariable v : varList) {
+                if (see1) {
+                    if (i < sz1) {
+                        LogicalVariable v2 = newOpGbyList.get(i);
+                        if (v != v2) {
+                            // cannot linearize
+                            return null;
+                        }
+                    } else {
+                        see1 = false;
+                        newOpGbyList.add(v);
+                        replGbyList.add(context.newVar());
+                    }
+                    i++;
+                } else {
+                    newOpGbyList.add(v);
+                    replGbyList.add(context.newVar());
+                }
+            }
+        }
+        // set the vars in the new op
+        int n = newOpGbyList.size();
+        for (int i = 0; i < n; i++) {
+            newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
+            VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
+        }
+        return newGbyOp;
+    }
+
+    private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
+            GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+        for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
+            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
+                // For now, if we cannot push everything, give up.
+                return new Pair<Boolean, ILogicalPlan>(false, null);
+            }
+        }
+        if (pushedRoots.isEmpty()) {
+            return new Pair<Boolean, ILogicalPlan>(true, null);
+        } else {
+            return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
+        }
+    }
+
+    private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
+            BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
+            List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            AggregateOperator initAgg = (AggregateOperator) op1;
+            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
+            if (!pOpRef.first) {
+                return false;
+            }
+            Mutable<ILogicalOperator> opRef = pOpRef.second;
+            if (opRef != null) {
+                toPushAccumulate.add(opRef);
+            }
+            bi.modifyGbyMap.put(oldGbyOp, gbyVars);
+            return true;
+        } else {
+            while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
+                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+            }
+            if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
+                return false;
+            }
+            GroupByOperator nestedGby = (GroupByOperator) op2;
+            List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+            List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
+            concatGbyVars.addAll(gbyVars2);
+            for (ILogicalPlan p : nestedGby.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
+                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
+                        return false;
+                    }
+                }
+            }
+
+            /***
+             * Push the nested pipeline which provides the input to the nested group operator into newGbyOp (the combined gby op).
+             * The change is to fix asterixdb issue 782.
+             */
+            Mutable<ILogicalOperator> nestedGbyInputRef = nestedGby.getInputs().get(0);
+            Mutable<ILogicalOperator> startOfPipelineRef = nestedGbyInputRef;
+            if (startOfPipelineRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                return true;
+            }
+
+            // move down the nested pipeline to find the start of the pipeline right upon the nested-tuple-source
+            boolean hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
+                    .getValue());
+            while (startOfPipelineRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                startOfPipelineRef = startOfPipelineRef.getValue().getInputs().get(0);
+                hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
+                        .getValue());
+            }
+            //keep the old nested-tuple-source
+            Mutable<ILogicalOperator> oldNts = startOfPipelineRef.getValue().getInputs().get(0);
+
+            //move down the nested op in the new gby operator
+            Mutable<ILogicalOperator> newGbyNestedOpRef = toPushAccumulate.get(0);
+            while (newGbyNestedOpRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                newGbyNestedOpRef = newGbyNestedOpRef.getValue().getInputs().get(0);
+            }
+
+            //insert the pipeline before nested gby into the new (combiner) gby's nested plan on top of the nested-tuple-source
+            startOfPipelineRef.getValue().getInputs().set(0, newGbyNestedOpRef.getValue().getInputs().get(0));
+            newGbyNestedOpRef.getValue().getInputs().set(0, nestedGbyInputRef);
+
+            //in the old gby operator, remove the nested pipeline since it is already pushed to the combiner gby
+            nestedGby.getInputs().set(0, oldNts);
+            List<LogicalVariable> aggProducedVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(toPushAccumulate.get(0).getValue(), aggProducedVars);
+
+            if (hasIsNullFunction && aggProducedVars.size() != 0) {
+                // if the old nested pipeline contains a not-null-check, we need to convert it to a not-system-null-check in the non-local gby
+                processNullTest(context, nestedGby, aggProducedVars);
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * Deal with the case where the nested plan in the combiner gby operator has a null-test before invoking aggregation functions.
+     * 
+     * @param context
+     *            The optimization context.
+     * @param nestedGby
+     *            The nested gby operator in the global gby operator's subplan.
+     * @param firstAggVar
+     *            The first aggregation variable produced by the combiner gby.
+     */
+    protected abstract void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
+            List<LogicalVariable> aggregateVarsProducedByCombiner);
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
index 3cd1100..73fd966 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -76,6 +77,7 @@
         }
 
         ntsToEtsInSubplan(subplan, context);
+        cleanupJoins(subplan);
         InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
         join.getInputs().add(opRef3);
         opRef2.setValue(OperatorManipulationUtil.eliminateSingleSubplanOverEts(subplan));
@@ -90,6 +92,30 @@
         return false;
     }
 
+    private static void cleanupJoins(SubplanOperator s) {
+        for (ILogicalPlan p : s.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                cleanupJoins(r);
+            }
+        }
+    }
+
+    /** clean up joins that have one input branch that is empty tuple source */
+    private static void cleanupJoins(Mutable<ILogicalOperator> opRef) {
+        if (opRef.getValue() instanceof AbstractBinaryJoinOperator) {
+            for (Mutable<ILogicalOperator> inputRef : opRef.getValue().getInputs()) {
+                if (inputRef.getValue().getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                    opRef.getValue().getInputs().remove(inputRef);
+                    opRef.setValue(opRef.getValue().getInputs().get(0).getValue());
+                    break;
+                }
+            }
+        }
+        for (Mutable<ILogicalOperator> inputRef : opRef.getValue().getInputs()) {
+            cleanupJoins(inputRef);
+        }
+    }
+
     private static void ntsToEtsInSubplan(SubplanOperator s, IOptimizationContext context) throws AlgebricksException {
         for (ILogicalPlan p : s.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 2a78a98..aa418ea 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -14,219 +14,18 @@
  */
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
-public class IntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
+public class IntroduceGroupByCombinerRule extends AbstractIntroduceGroupByCombinerRule {
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (context.checkIfInDontApplySet(this, op)) {
-            return false;
-        }
-        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
-            return false;
-        }
-        GroupByOperator gbyOp = (GroupByOperator) op;
-        if (gbyOp.getExecutionMode() != ExecutionMode.PARTITIONED) {
-            return false;
-        }
-
-        BookkeepingInfo bi = new BookkeepingInfo();
-        GroupByOperator newGbyOp = opToPush(gbyOp, bi, context);
-        if (newGbyOp == null) {
-            return false;
-        }
-
-        replaceOriginalAggFuncs(bi.toReplaceMap);
-
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
-            LogicalVariable newDecorVar = context.newVar();
-            newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
-            p.second.setValue(new VariableReferenceExpression(newDecorVar));
-        }
-        newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
-        Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
-        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_HASH_GROUP_BY, v);
-
-        Object v2 = gbyOp.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY);
-        newGbyOp.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, v2);
-
-        List<LogicalVariable> propagatedVars = new LinkedList<LogicalVariable>();
-        VariableUtilities.getProducedVariables(newGbyOp, propagatedVars);
-
-        Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
-        OperatorPropertiesUtil.getFreeVariablesInSubplans(gbyOp, freeVars);
-
-        for (LogicalVariable var : freeVars) {
-            if (!propagatedVars.contains(var)) {
-                LogicalVariable newDecorVar = context.newVar();
-                newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
-                VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
-                        newDecorVar, context);
-            }
-        }
-
-        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
-        opRef3.setValue(newGbyOp);
-        typeGby(newGbyOp, context);
-        typeGby(gbyOp, context);
-    	context.addToDontApplySet(this, op);
-        return true;
+    protected void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
+            List<LogicalVariable> aggregateVarsProducedByCombiner) {
+        /** The default introduce group-by combiner rule ignores null test, however a language implementor can decide their own semantics. */
     }
 
-    private void typeGby(AbstractOperatorWithNestedPlans op, IOptimizationContext context) throws AlgebricksException {
-        for (ILogicalPlan p : op.getNestedPlans()) {
-            OperatorPropertiesUtil.typePlan(p, context);
-        }
-        context.computeAndSetTypeEnvironmentForOperator(op);
-    }
-
-    private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
-            throws AlgebricksException {
-        // Hook up input to new group-by.
-        Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
-        ILogicalOperator op3 = opRef3.getValue();
-        GroupByOperator newGbyOp = new GroupByOperator();
-        newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
-        // Copy annotations.        
-        Map<String, Object> annotations = newGbyOp.getAnnotations();
-        annotations.putAll(gbyOp.getAnnotations());
-
-        List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
-        for (ILogicalPlan p : gbyOp.getNestedPlans()) {
-            Pair<Boolean, ILogicalPlan> bip = tryToPushSubplan(p, gbyOp, newGbyOp, bi, gbyVars, context);
-            if (!bip.first) {
-                // For now, if we cannot push everything, give up.
-                return null;
-            }
-            ILogicalPlan pushedSubplan = bip.second;
-            if (pushedSubplan != null) {
-                newGbyOp.getNestedPlans().add(pushedSubplan);
-            }
-        }
-
-        ArrayList<LogicalVariable> newOpGbyList = new ArrayList<LogicalVariable>();
-        ArrayList<LogicalVariable> replGbyList = new ArrayList<LogicalVariable>();
-        // Find maximal sequence of variable.
-        for (Map.Entry<GroupByOperator, List<LogicalVariable>> e : bi.modifyGbyMap.entrySet()) {
-            List<LogicalVariable> varList = e.getValue();
-            boolean see1 = true;
-            int sz1 = newOpGbyList.size();
-            int i = 0;
-            for (LogicalVariable v : varList) {
-                if (see1) {
-                    if (i < sz1) {
-                        LogicalVariable v2 = newOpGbyList.get(i);
-                        if (v != v2) {
-                            // cannot linearize
-                            return null;
-                        }
-                    } else {
-                        see1 = false;
-                        newOpGbyList.add(v);
-                        replGbyList.add(context.newVar());
-                    }
-                    i++;
-                } else {
-                    newOpGbyList.add(v);
-                    replGbyList.add(context.newVar());
-                }
-            }
-        }
-        // set the vars in the new op
-        int n = newOpGbyList.size();
-        for (int i = 0; i < n; i++) {
-            newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
-            VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
-        }
-        return newGbyOp;
-    }
-
-    private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
-            GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
-            throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
-        for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
-            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
-                // For now, if we cannot push everything, give up.
-                return new Pair<Boolean, ILogicalPlan>(false, null);
-            }
-        }
-        if (pushedRoots.isEmpty()) {
-            return new Pair<Boolean, ILogicalPlan>(true, null);
-        } else {
-            return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
-        }
-    }
-
-    private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
-            BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
-            List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
-        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-            return false;
-        }
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            AggregateOperator initAgg = (AggregateOperator) op1;
-            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
-            if (!pOpRef.first) {
-                return false;
-            }
-            Mutable<ILogicalOperator> opRef = pOpRef.second;
-            if (opRef != null) {
-                toPushAccumulate.add(opRef);
-            }
-            bi.modifyGbyMap.put(oldGbyOp, gbyVars);
-            return true;
-        } else {
-            while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
-                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
-            }
-            if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                return false;
-            }
-            GroupByOperator nestedGby = (GroupByOperator) op2;
-            List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
-            List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
-            concatGbyVars.addAll(gbyVars2);
-            for (ILogicalPlan p : nestedGby.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
-                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        }
-    }
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 7d43a68..e1c8463 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -63,19 +63,42 @@
             return false;
         }
 
-        InnerJoinOperator product = new InnerJoinOperator(
-                new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        /**
+         * finding the boundary between left branch and right branch
+         * operator pipeline on-top-of boundaryOpRef (exclusive) is the inner branch
+         * operator pipeline under boundaryOpRef (inclusive) is the outer branch
+         */
+        Mutable<ILogicalOperator> currentOpRef = opRef;
+        Mutable<ILogicalOperator> boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+        while (currentOpRef.getValue().getInputs().size() == 1) {
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        }
+        Mutable<ILogicalOperator> tupleSourceOpRef = currentOpRef;
+        currentOpRef = opRef;
+        if (tupleSourceOpRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            while (currentOpRef.getValue().getInputs().size() == 1
+                    && currentOpRef.getValue() instanceof AbstractScanOperator
+                    && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
+                if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
+                    /** move down the boundary if the operator is independent of the tuple source */
+                    boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+                } else {
+                    break;
+                }
+                currentOpRef = currentOpRef.getValue().getInputs().get(0);
+            }
+        }
 
-        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
-        context.computeAndSetTypeEnvironmentForOperator(ets);
-        Mutable<ILogicalOperator> emptySrc = new MutableObject<ILogicalOperator>(ets);
-        List<Mutable<ILogicalOperator>> opInpList = op.getInputs();
-        opInpList.clear();
-        opInpList.add(emptySrc);
-        product.getInputs().add(opRef2); // outer branch
-        product.getInputs().add(new MutableObject<ILogicalOperator>(op));
-        opRef.setValue(product); // plug the product in the plan
-        context.computeAndSetTypeEnvironmentForOperator(product);
+        /** join the two independent branches */
+        InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE),
+                new MutableObject<ILogicalOperator>(boundaryOpRef.getValue()), new MutableObject<ILogicalOperator>(
+                        opRef.getValue()));
+        opRef.setValue(join);
+        ILogicalOperator ets = new EmptyTupleSourceOperator();
+        boundaryOpRef.setValue(ets);
+        context.computeAndSetTypeEnvironmentForOperator(boundaryOpRef.getValue());
+        context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+        context.computeAndSetTypeEnvironmentForOperator(join);
         return true;
     }
 
@@ -98,6 +121,9 @@
     }
 
     private boolean opsAreIndependent(ILogicalOperator unnestOp, ILogicalOperator outer) throws AlgebricksException {
+        if (unnestOp.equals(outer)) {
+            return false;
+        }
         List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
         VariableUtilities.getUsedVariables(unnestOp, opUsedVars);
         Set<LogicalVariable> op2LiveVars = new HashSet<LogicalVariable>();
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 13a08b7..c584971 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.pregelix.core.util;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.EnumSet;
 
 import org.apache.commons.io.FileUtils;
@@ -78,6 +79,7 @@
         ncConfig1.nodeId = NC1_ID;
         ncConfig1.ioDevices = "dev1,dev2";
         ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
+        ncConfig1.appArgs = Collections.singletonList("65536");
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
 
@@ -90,6 +92,7 @@
         ncConfig2.nodeId = NC2_ID;
         ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
         ncConfig2.ioDevices = "dev3,dev4";
+        ncConfig2.appArgs = Collections.singletonList("65536");
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index a8307d7..0f10b4d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -66,8 +66,8 @@
         }
     };
 
-    public RuntimeContext(INCApplicationContext appCtx) {
-        int pageSize = 64 * 1024;
+    public RuntimeContext(INCApplicationContext appCtx, int vFrameSize) {
+        int pageSize = vFrameSize;
         long memSize = Runtime.getRuntime().maxMemory();
         long bufferSize = memSize / 4;
         int numPages = (int) (bufferSize / pageSize);
diff --git a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
index 79f42ed..82a7bfe 100644
--- a/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
+++ b/pregelix/pregelix-dist/src/main/resources/conf/cluster.properties
@@ -16,7 +16,7 @@
 CC_CLIENTPORT=3099
 
 #The CC port for Hyracks cluster management
-CC_CLUSTERPORT=1099
+CC_CLUSTERPORT=4099
 
 #The CC port for REST communication
 CC_HTTPPORT=16001
@@ -42,6 +42,9 @@
 #The frame size of the internal dataflow engine
 FRAME_SIZE=65536
 
+#The frame size of the vertex storage, it should be larger than the largest vertex byte size
+VFRAME_SIZE=65536
+
 #The number of jobs whose logs are kept in-memory on the CC
 JOB_HISTORY_SIZE=0
 
diff --git a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
index f9b6a4e..1cba489 100644
--- a/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-dist/src/main/resources/scripts/startnc.sh
@@ -96,7 +96,7 @@
 cmd=( "${PREGELIX_HOME}/bin/pregelixnc" )
 cmd+=( -cc-host $CCHOST -cc-port $CC_CLUSTERPORT 
 	   -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR
-	   -node-id $NODEID -iodevices "${IO_DIRS}" -net-buffer-count 5 );
+	   -node-id $NODEID -iodevices "${IO_DIRS}" -net-buffer-count 5 -- ${VFRAME_SIZE});
 
 printf "\n\n\n********************************************\nStarting NC with command %s\n\n" "${cmd[*]}" >> "$NCLOGS_DIR/$NODEID.log"
 ${cmd[@]} >> "$NCLOGS_DIR/$NODEID.log" 2>&1 &
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
index fe72d7a..8ca227a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
@@ -23,7 +23,11 @@
 
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
-        rCtx = new RuntimeContext(ncAppCtx);
+        int vFrameSize = 65536;
+        if(args.length >0){
+            vFrameSize = Integer.parseInt(args[0]);
+        }
+        rCtx = new RuntimeContext(ncAppCtx, vFrameSize);
         ncAppCtx.setApplicationObject(rCtx);
     }