merge the fix of Issue 430 to master branch
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f7c0ed..0c105d0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,12 +20,11 @@
     // private ArrayList<AggregateFunctionCallExpression> expressions;
     // TODO type safe list of expressions
     private List<Mutable<ILogicalExpression>> mergeExpressions;
-    private LogicalVariable partitioningVariable;
     private boolean global;
 
     public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
         super(variables, expressions);
-        global = false;
+        global = true;
     }
 
     @Override
@@ -71,16 +70,8 @@
         return mergeExpressions;
     }
 
-    public void setPartitioningVariable(LogicalVariable partitioningVariable) {
-        this.partitioningVariable = partitioningVariable;
-    }
-
-    public LogicalVariable getPartitioningVariable() {
-        return partitioningVariable;
-    }
-
-    public void setGlobal() {
-        global = true;
+    public void setGlobal(boolean global) {
+        this.global = global;
     }
 
     public boolean isGlobal() {
@@ -100,4 +91,5 @@
         }
         return env;
     }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index a620e54..42f7e20 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -77,9 +77,6 @@
         for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

             exprRef.getValue().getUsedVariables(usedVariables);

         }

-        if (op.getPartitioningVariable() != null) {

-            usedVariables.add(op.getPartitioningVariable());

-        }

         return null;

     }

 

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 5d490e9..f47c2ec 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,12 +16,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +29,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -40,7 +37,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -59,30 +55,29 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        IPhysicalPropertiesVector childProps = op2.getDeliveredPhysicalProperties();
-        deliveredProperties = new StructuralPropertiesVector(childProps.getPartitioningProperty(),
-                new ArrayList<ILocalStructuralProperty>(0));
+        AggregateOperator aggOp = (AggregateOperator) op;
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        if (aggOp.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            deliveredProperties = new StructuralPropertiesVector(op2.getDeliveredPhysicalProperties()
+                    .getPartitioningProperty(), new ArrayList<ILocalStructuralProperty>());
+        } else {
+            deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+                    new ArrayList<ILocalStructuralProperty>());
+        }
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
         AggregateOperator aggOp = (AggregateOperator) op;
-        if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
-            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            if (aggOp.isGlobal()) {
-                pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
-            } else {
-                Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
-                partitioningVariables.add(aggOp.getPartitioningVariable());
-                pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null),
-                        null);
-            }
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+        if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
         }
+
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 11e24d7..531b300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -37,10 +38,8 @@
 
 public class StreamLimitPOperator extends AbstractPhysicalOperator {
 
-    private boolean global;
+    public StreamLimitPOperator() {
 
-    public StreamLimitPOperator(boolean global) {
-        this.global = global;
     }
 
     @Override
@@ -55,14 +54,22 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
         ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            //partitioning property: unpartitioned;  local property: whatever from the child
+            deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, op2
+                    .getDeliveredPhysicalProperties().getLocalProperties());
+        } else {
+            deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        }
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        if (global) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -83,7 +90,8 @@
         ILogicalExpression offsetExpr = limit.getOffset().getValue();
         IScalarEvaluatorFactory offsetFact = (offsetExpr == null) ? null : expressionRuntimeProvider
                 .createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
         StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
                 context.getBinaryIntegerInspectorFactory());
         builder.contributeMicroOperator(limit, runtime, recDesc);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..35b36dc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -96,14 +97,21 @@
                 break;
             }
             default: {
+                boolean forceUnpartitioned = false;
                 if (op.getOperatorTag() == LogicalOperatorTag.LIMIT) {
                     LimitOperator opLim = (LimitOperator) op;
                     if (opLim.isTopmostLimitOp()) {
-                        if (opLim.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
-                            opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
-                            change = true;
-                        }
-                        break;
+                        opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+                        change = true;
+                        forceUnpartitioned = true;
+                    }
+                }
+                if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    AggregateOperator aggOp = (AggregateOperator) op;
+                    if (aggOp.isGlobal()) {
+                        op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+                        change = true;
+                        forceUnpartitioned = true;
                     }
                 }
 
@@ -112,6 +120,8 @@
                     AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
                     switch (inputOp.getExecutionMode()) {
                         case PARTITIONED: {
+                            if (forceUnpartitioned)
+                                break;
                             op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
                             change = true;
                             exit = true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 30ab542..013ddda 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -16,12 +16,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -110,17 +108,10 @@
             } else {
                 // The local aggregate operator is fed by the input of the original aggregate operator.
                 pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
-                // Set the partitioning variable in the local agg to ensure it is not projected away.
-                context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
-                LogicalVariable trueVar = context.newVar();
                 // Reintroduce assign op for the global agg partitioning var.
-                AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
-                        ConstantExpression.TRUE));
-                trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
-                context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
-                initAgg.setPartitioningVariable(trueVar);
-                initAgg.getInputs().get(0).setValue(trueAssignOp);
-                initAgg.setGlobal();
+                initAgg.getInputs().get(0).setValue(pushedAgg);
+                pushedAgg.setGlobal(false);
+                context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
             }
             return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
         } else {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index fa5000e..06172d9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -55,6 +55,11 @@
             return false;
         }
 
+        //stop rewriting if the operators originates from a nested tuple source
+        if (insideSubplan(opRef)) {
+            return false;
+        }
+
         // We may pull selects above the join we create in order to eliminate possible dependencies between
         // the outer and inner input plans of the join.
         List<ILogicalOperator> topSelects = new ArrayList<ILogicalOperator>();
@@ -286,4 +291,24 @@
         return findPlanPartition((AbstractLogicalOperator) op.getInputs().get(0).getValue(), innerUsedVars,
                 outerUsedVars, innerOps, outerOps, topSelects, belowSecondUnnest);
     }
+
+    /**
+     * check whether the operator is inside a sub-plan
+     * 
+     * @param nestedRootRef
+     * @return true-if it is; false otherwise.
+     */
+    private boolean insideSubplan(Mutable<ILogicalOperator> nestedRootRef) {
+        AbstractLogicalOperator nestedRoot = (AbstractLogicalOperator) nestedRootRef.getValue();
+        if (nestedRoot.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            return true;
+        }
+        List<Mutable<ILogicalOperator>> inputs = nestedRoot.getInputs();
+        for (Mutable<ILogicalOperator> input : inputs) {
+            if (insideSubplan(input)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 3260ca0..7bc150a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -51,6 +51,7 @@
         orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
         orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
         orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+        orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
     }
 
     @Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index f017e0f..9becf6e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -48,57 +48,46 @@
 
 /**
  * Factors out common sub-expressions by assigning them to a variables, and replacing the common sub-expressions with references to those variables.
- *
  * Preconditions/Assumptions:
  * Assumes no projects are in the plan. This rule ignores variable reference expressions and constants (other rules deal with those separately).
- * 
  * Postconditions/Examples:
  * Plan with extracted sub-expressions. Generates one assign operator per extracted expression.
- * 
  * Example 1 - Simple Arithmetic Example (simplified)
- * 
  * Before plan:
  * assign [$$1] <- [5 + 6 - 10]
- *   assign [$$0] <- [5 + 6 + 30]
- * 
+ * assign [$$0] <- [5 + 6 + 30]
  * After plan:
  * assign [$$1] <- [$$5 - 10]
- *   assign [$$0] <- [$$5 + 30]
- *     assign [$$5] <- [5 + 6]
- * 
+ * assign [$$0] <- [$$5 + 30]
+ * assign [$$5] <- [5 + 6]
  * Example 2 - Cleaning up 'Distinct By' (simplified)
- * 
  * Before plan: (notice how $$0 is not live after the distinct)
  * assign [$$3] <- [field-access($$0, 1)]
- *   distinct ([%0->$$5])
- *     assign [$$5] <- [field-access($$0, 1)]
- *       unnest $$0 <- [scan-dataset]
- * 
+ * distinct ([%0->$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
  * After plan: (notice how the issue of $$0 is fixed)
  * assign [$$3] <- [$$5]
- *   distinct ([$$5])
- *     assign [$$5] <- [field-access($$0, 1)]
- *       unnest $$0 <- [scan-dataset]
- * 
+ * distinct ([$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
  * Example 3 - Pulling Common Expressions Above Joins (simplified)
- * 
  * Before plan:
  * assign [$$9] <- funcZ(funcY($$8))
- *   join (funcX(funcY($$8)))
- * 
+ * join (funcX(funcY($$8)))
  * After plan:
  * assign [$$9] <- funcZ($$10))
- *   select (funcX($$10))
- *     assign [$$10] <- [funcY($$8)]
- *       join (TRUE)
+ * select (funcX($$10))
+ * assign [$$10] <- [funcY($$8)]
+ * join (TRUE)
  */
 public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
 
     private final List<ILogicalExpression> originalAssignExprs = new ArrayList<ILogicalExpression>();
-    
+
     private final CommonExpressionSubstitutionVisitor substVisitor = new CommonExpressionSubstitutionVisitor();
     private final Map<ILogicalExpression, ExprEquivalenceClass> exprEqClassMap = new HashMap<ILogicalExpression, ExprEquivalenceClass>();
-    
+
     // Set of operators for which common subexpression elimination should not be performed.
     private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>();
     static {
@@ -109,9 +98,10 @@
         ignoreOps.add(LogicalOperatorTag.AGGREGATE);
         ignoreOps.add(LogicalOperatorTag.RUNNINGAGGREGATE);
     }
-    
+
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -126,7 +116,8 @@
         return modified;
     }
 
-    private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef, ILogicalExpression rhsExpr, ILogicalOperator op) {
+    private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef,
+            ILogicalExpression rhsExpr, ILogicalOperator op) {
         ExprEquivalenceClass exprEqClass = exprEqClassMap.get(rhsExpr);
         if (exprEqClass == null) {
             exprEqClass = new ExprEquivalenceClass(op, rhsExprRef);
@@ -141,7 +132,7 @@
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
-        
+
         boolean modified = false;
         // Recurse into children.
         for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
@@ -149,7 +140,7 @@
                 modified = true;
             }
         }
-        
+
         // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement
         // (the resulting new variables should be assigned live below a replicate).
         if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
@@ -160,7 +151,7 @@
         if (ignoreOps.contains(op.getOperatorTag())) {
             return modified;
         }
-        
+
         // Remember a copy of the original assign expressions, so we can add them to the equivalence class map
         // after replacing expressions within the assign operator itself.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
@@ -173,13 +164,13 @@
                 originalAssignExprs.add(expr.cloneExpression());
             }
         }
-        
+
         // Perform common subexpression elimination.
         substVisitor.setOperator(op);
         if (op.acceptExpressionTransform(substVisitor)) {
             modified = true;
         }
-        
+
         // Update equivalence class map.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
@@ -194,7 +185,7 @@
                 // Update equivalence class map.
                 LogicalVariable lhs = assignOp.getVariables().get(i);
                 updateEquivalenceClassMap(lhs, exprRef, exprRef.getValue(), op);
-                
+
                 // Update equivalence class map with original assign expression.
                 updateEquivalenceClassMap(lhs, exprRef, originalAssignExprs.get(i), op);
             }
@@ -225,35 +216,30 @@
     }
 
     private class CommonExpressionSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
-                
-        private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-        private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+
         private IOptimizationContext context;
-        private ILogicalOperator op;        
-        
+        private ILogicalOperator op;
+
         public void setContext(IOptimizationContext context) {
             this.context = context;
         }
-        
+
         public void setOperator(ILogicalOperator op) throws AlgebricksException {
             this.op = op;
-            liveVars.clear();
-            usedVars.clear();
         }
-        
+
         @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
-            if (liveVars.isEmpty() && usedVars.isEmpty()) {
-                VariableUtilities.getLiveVariables(op, liveVars);
-                VariableUtilities.getUsedVariables(op, usedVars);
-            }
-            
             AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
             boolean modified = false;
             ExprEquivalenceClass exprEqClass = exprEqClassMap.get(expr);
             if (exprEqClass != null) {
                 // Replace common subexpression with existing variable. 
                 if (exprEqClass.variableIsSet()) {
+                    Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+                    List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+                    VariableUtilities.getLiveVariables(op, liveVars);
+                    VariableUtilities.getUsedVariables(op, usedVars);
                     // Check if the replacing variable is live at this op.
                     // However, if the op is already using variables that are not live, then a replacement may enable fixing the plan.
                     // This behavior is necessary to, e.g., properly deal with distinct by.
@@ -266,9 +252,15 @@
                     }
                 } else {
                     if (assignCommonExpression(exprEqClass, expr)) {
-                        exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
-                        // Do not descend into children since this expr has been completely replaced.
-                        return true;
+                        //re-obtain the live vars after rewriting in the method called in the if condition
+                        Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+                        VariableUtilities.getLiveVariables(op, liveVars);
+                        //rewrite only when the variable is live
+                        if (liveVars.contains(exprEqClass.getVariable())) {
+                            exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                            // Do not descend into children since this expr has been completely replaced.
+                            return true;
+                        }
                     }
                 }
             } else {
@@ -278,7 +270,7 @@
                     exprEqClassMap.put(expr, exprEqClass);
                 }
             }
-            
+
             // Descend into function arguments.
             if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                 AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
@@ -290,17 +282,19 @@
             }
             return modified;
         }
-        
-        private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr) throws AlgebricksException {
+
+        private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr)
+                throws AlgebricksException {
             AbstractLogicalOperator firstOp = (AbstractLogicalOperator) exprEqClass.getFirstOperator();
             Mutable<ILogicalExpression> firstExprRef = exprEqClass.getFirstExpression();
-            if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                    || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
                 // Do not extract common expressions from within the same join operator.
                 if (firstOp == op) {
                     return false;
                 }
                 AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) firstOp;
-                Mutable<ILogicalExpression> joinCond = joinOp.getCondition();                
+                Mutable<ILogicalExpression> joinCond = joinOp.getCondition();
                 ILogicalExpression enclosingExpr = getEnclosingExpression(joinCond, firstExprRef.getValue());
                 if (enclosingExpr == null) {
                     // No viable enclosing expression that we can pull out from the join.
@@ -312,12 +306,13 @@
                 op.getInputs().get(0).setValue(selectOp);
                 // Set firstOp to be the select below op, since we want to assign the common subexpr there.
                 firstOp = (AbstractLogicalOperator) selectOp;
-            } else if (firstOp.getInputs().size() > 1) { 
+            } else if (firstOp.getInputs().size() > 1) {
                 // Bail for any non-join operator with multiple inputs.
                 return false;
-            }                        
+            }
             LogicalVariable newVar = context.newVar();
-            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));            
+            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef
+                    .getValue().cloneExpression()));
             // Place assign below firstOp.
             newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
             newAssign.setExecutionMode(firstOp.getExecutionMode());
@@ -330,7 +325,8 @@
             return true;
         }
 
-        private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef, ILogicalExpression commonSubExpr) {
+        private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef,
+                ILogicalExpression commonSubExpr) {
             ILogicalExpression conditionExpr = conditionExprRef.getValue();
             if (conditionExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
                 return null;
@@ -371,7 +367,7 @@
             return enclosingBoolExpr;
         }
     }
-    
+
     private boolean containsExpr(ILogicalExpression expr, ILogicalExpression searchExpr) {
         if (expr == searchExpr) {
             return true;
@@ -387,7 +383,7 @@
         }
         return false;
     }
-    
+
     private boolean isEqJoinCondition(ILogicalExpression expr) {
         AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
         if (funcExpr.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.EQ)) {
@@ -400,38 +396,38 @@
         }
         return false;
     }
-    
+
     private final class ExprEquivalenceClass {
         // First operator in which expression is used.
         private final ILogicalOperator firstOp;
-        
+
         // Reference to expression in first op.
         private final Mutable<ILogicalExpression> firstExprRef;
-        
+
         // Variable that this expression has been assigned to.
         private LogicalVariable var;
-        
+
         public ExprEquivalenceClass(ILogicalOperator firstOp, Mutable<ILogicalExpression> firstExprRef) {
             this.firstOp = firstOp;
             this.firstExprRef = firstExprRef;
         }
-        
+
         public ILogicalOperator getFirstOperator() {
             return firstOp;
         }
-        
+
         public Mutable<ILogicalExpression> getFirstExpression() {
             return firstExprRef;
         }
-        
+
         public void setVariable(LogicalVariable var) {
             this.var = var;
         }
-        
+
         public LogicalVariable getVariable() {
             return var;
         }
-        
+
         public boolean variableIsSet() {
             return var != null;
         }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
index c3d935c..c75db57 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -29,7 +29,7 @@
             return false;
         }
         AggregateOperator aggOp = (AggregateOperator) op;
-        if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+        if (!aggOp.isGlobal() || aggOp.getExecutionMode() == ExecutionMode.LOCAL) {
             return false;
         }
         Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
index acfdbd3..e5f60bc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
@@ -106,7 +106,7 @@
                     opLim.getMaxObjects(), opLim.getOffset());
             clone2 = new LimitOperator(maxPlusOffset, false);
         }
-        clone2.setPhysicalOperator(new StreamLimitPOperator(false));
+        clone2.setPhysicalOperator(new StreamLimitPOperator());
         clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
         clone2.setExecutionMode(op2.getExecutionMode());
         clone2.recomputeSchema();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f1b9a7b..9c8ad46 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -172,9 +171,7 @@
                     break;
                 }
                 case LIMIT: {
-                    LimitOperator opLim = (LimitOperator) op;
-                    op.setPhysicalOperator(new StreamLimitPOperator(opLim.isTopmostLimitOp()
-                            && opLim.getExecutionMode() == ExecutionMode.PARTITIONED));
+                    op.setPhysicalOperator(new StreamLimitPOperator());
                     break;
                 }
                 case NESTEDTUPLESOURCE: {
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
index 1cc34e1..f42e321 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
@@ -60,6 +60,5 @@
 # For example, set the com.xyz.foo logger to only log SEVERE
 # messages:
 
-edu.uci.ics.asterix.level = WARNING
-edu.uci.ics.algebricks.level = WARNING
+#edu.uci.ics.hyracks.algebricks.level = FINEST
 edu.uci.ics.hyracks.level = WARNING
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 7e4e271..a049f15 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -40,9 +40,7 @@
     public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();

     static {

         NORMALIZATION.add(new EliminateSubplanRule());

-        NORMALIZATION.add(new IntroduceAggregateCombinerRule());

         NORMALIZATION.add(new BreakSelectIntoConjunctsRule());

-        NORMALIZATION.add(new IntroduceAggregateCombinerRule());

         NORMALIZATION.add(new PushSelectIntoJoinRule());

         NORMALIZATION.add(new ExtractGbyExpressionsRule());

         NORMALIZATION.add(new RemoveRedundantSelectRule());

@@ -84,6 +82,7 @@
         CONSOLIDATION.add(new IntroduceEarlyProjectRule());

         CONSOLIDATION.add(new ConsolidateAssignsRule());

         CONSOLIDATION.add(new IntroduceGroupByCombinerRule());

+        CONSOLIDATION.add(new IntroduceAggregateCombinerRule());

         CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());

     }

 

diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 56046f7..8e67891 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -45,5 +45,10 @@
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>commons-io</groupId>
+  		<artifactId>commons-io</artifactId>
+  		<version>2.4</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
new file mode 100644
index 0000000..bf7ff33
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FileRemoveOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private final IFileSplitProvider fileSplitProvider;
+
+    public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvder) {
+        super(spec, 0, 0);
+        this.fileSplitProvider = fileSplitProvder;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+        return new AbstractOperatorNodePushable() {
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                throw new IllegalStateException();
+            }
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                File f = split.getLocalFile().getFile();
+                try {
+                    FileUtils.deleteDirectory(f);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public IFrameWriter getInputFrameWriter(int index) {
+                throw new IllegalStateException();
+            }
+
+            @Override
+            public int getInputArity() {
+                return 0;
+            }
+
+            @Override
+            public void deinitialize() throws HyracksDataException {
+            }
+        };
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index d099645..8410e1e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,7 @@
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 0;
+        ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
 
         // cluster controller
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index e54373f..b8cd953 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -40,6 +40,7 @@
 
     private VLongWritable vid = new VLongWritable();
     private GraphMutationVertex newVertex = null;
+    private DoubleWritable msg = new DoubleWritable(0.0);
 
     @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
@@ -47,17 +48,20 @@
             if (newVertex == null) {
                 newVertex = new GraphMutationVertex();
             }
-            if (getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0) {
-                deleteVertex(getVertexId());
-            } else {
-                vid.set(100 * getVertexId().get());
-                newVertex.setVertexId(vid);
-                newVertex.setVertexValue(getVertexValue());
-                addVertex(vid, newVertex);
+            if (getVertexId().get() < 100) {
+                if ((getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0)) {
+                    deleteVertex(getVertexId());
+                } else {
+                    vid.set(100 * getVertexId().get());
+                    newVertex.setVertexId(vid);
+                    newVertex.setVertexValue(getVertexValue());
+                    addVertex(vid, newVertex);
+                    sendMsg(vid, msg);
+                }
             }
             voteToHalt();
         } else {
-            if (getVertexId().get() % 190 == 0) {
+            if (getVertexId().get() == 1900) {
                 deleteVertex(getVertexId());
             }
             voteToHalt();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 00f6f54..7126e8c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -19,6 +19,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,17 +94,18 @@
     @Test
     public void test() throws Exception {
         setUp();
-        Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
+        Plan[] plans = new Plan[] { Plan.OUTER_JOIN_SORT, Plan.OUTER_JOIN, Plan.INNER_JOIN, Plan.OUTER_JOIN_SINGLE_SORT };
         for (Plan plan : plans) {
             driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
                     PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+            compareResults();
         }
-        compareResults();
         tearDown();
         waitawhile();
     }
 
     private void compareResults() throws Exception {
+        FileUtils.deleteQuietly(new File(resultFileDir));
         dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(resultFileDir));
         TestUtils.compareWithResultDir(new File(expectedFileDir), new File(resultFileDir));
     }