Merge branch 'yingyi/fullstack_beta_Fix'
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index fa5000e..06172d9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -55,6 +55,11 @@
             return false;
         }
 
+        //stop rewriting if the operators originates from a nested tuple source
+        if (insideSubplan(opRef)) {
+            return false;
+        }
+
         // We may pull selects above the join we create in order to eliminate possible dependencies between
         // the outer and inner input plans of the join.
         List<ILogicalOperator> topSelects = new ArrayList<ILogicalOperator>();
@@ -286,4 +291,24 @@
         return findPlanPartition((AbstractLogicalOperator) op.getInputs().get(0).getValue(), innerUsedVars,
                 outerUsedVars, innerOps, outerOps, topSelects, belowSecondUnnest);
     }
+
+    /**
+     * check whether the operator is inside a sub-plan
+     * 
+     * @param nestedRootRef
+     * @return true-if it is; false otherwise.
+     */
+    private boolean insideSubplan(Mutable<ILogicalOperator> nestedRootRef) {
+        AbstractLogicalOperator nestedRoot = (AbstractLogicalOperator) nestedRootRef.getValue();
+        if (nestedRoot.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            return true;
+        }
+        List<Mutable<ILogicalOperator>> inputs = nestedRoot.getInputs();
+        for (Mutable<ILogicalOperator> input : inputs) {
+            if (insideSubplan(input)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index f017e0f..9becf6e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -48,57 +48,46 @@
 
 /**
  * Factors out common sub-expressions by assigning them to a variables, and replacing the common sub-expressions with references to those variables.
- *
  * Preconditions/Assumptions:
  * Assumes no projects are in the plan. This rule ignores variable reference expressions and constants (other rules deal with those separately).
- * 
  * Postconditions/Examples:
  * Plan with extracted sub-expressions. Generates one assign operator per extracted expression.
- * 
  * Example 1 - Simple Arithmetic Example (simplified)
- * 
  * Before plan:
  * assign [$$1] <- [5 + 6 - 10]
- *   assign [$$0] <- [5 + 6 + 30]
- * 
+ * assign [$$0] <- [5 + 6 + 30]
  * After plan:
  * assign [$$1] <- [$$5 - 10]
- *   assign [$$0] <- [$$5 + 30]
- *     assign [$$5] <- [5 + 6]
- * 
+ * assign [$$0] <- [$$5 + 30]
+ * assign [$$5] <- [5 + 6]
  * Example 2 - Cleaning up 'Distinct By' (simplified)
- * 
  * Before plan: (notice how $$0 is not live after the distinct)
  * assign [$$3] <- [field-access($$0, 1)]
- *   distinct ([%0->$$5])
- *     assign [$$5] <- [field-access($$0, 1)]
- *       unnest $$0 <- [scan-dataset]
- * 
+ * distinct ([%0->$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
  * After plan: (notice how the issue of $$0 is fixed)
  * assign [$$3] <- [$$5]
- *   distinct ([$$5])
- *     assign [$$5] <- [field-access($$0, 1)]
- *       unnest $$0 <- [scan-dataset]
- * 
+ * distinct ([$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
  * Example 3 - Pulling Common Expressions Above Joins (simplified)
- * 
  * Before plan:
  * assign [$$9] <- funcZ(funcY($$8))
- *   join (funcX(funcY($$8)))
- * 
+ * join (funcX(funcY($$8)))
  * After plan:
  * assign [$$9] <- funcZ($$10))
- *   select (funcX($$10))
- *     assign [$$10] <- [funcY($$8)]
- *       join (TRUE)
+ * select (funcX($$10))
+ * assign [$$10] <- [funcY($$8)]
+ * join (TRUE)
  */
 public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
 
     private final List<ILogicalExpression> originalAssignExprs = new ArrayList<ILogicalExpression>();
-    
+
     private final CommonExpressionSubstitutionVisitor substVisitor = new CommonExpressionSubstitutionVisitor();
     private final Map<ILogicalExpression, ExprEquivalenceClass> exprEqClassMap = new HashMap<ILogicalExpression, ExprEquivalenceClass>();
-    
+
     // Set of operators for which common subexpression elimination should not be performed.
     private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>();
     static {
@@ -109,9 +98,10 @@
         ignoreOps.add(LogicalOperatorTag.AGGREGATE);
         ignoreOps.add(LogicalOperatorTag.RUNNINGAGGREGATE);
     }
-    
+
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -126,7 +116,8 @@
         return modified;
     }
 
-    private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef, ILogicalExpression rhsExpr, ILogicalOperator op) {
+    private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef,
+            ILogicalExpression rhsExpr, ILogicalOperator op) {
         ExprEquivalenceClass exprEqClass = exprEqClassMap.get(rhsExpr);
         if (exprEqClass == null) {
             exprEqClass = new ExprEquivalenceClass(op, rhsExprRef);
@@ -141,7 +132,7 @@
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
-        
+
         boolean modified = false;
         // Recurse into children.
         for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
@@ -149,7 +140,7 @@
                 modified = true;
             }
         }
-        
+
         // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement
         // (the resulting new variables should be assigned live below a replicate).
         if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
@@ -160,7 +151,7 @@
         if (ignoreOps.contains(op.getOperatorTag())) {
             return modified;
         }
-        
+
         // Remember a copy of the original assign expressions, so we can add them to the equivalence class map
         // after replacing expressions within the assign operator itself.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
@@ -173,13 +164,13 @@
                 originalAssignExprs.add(expr.cloneExpression());
             }
         }
-        
+
         // Perform common subexpression elimination.
         substVisitor.setOperator(op);
         if (op.acceptExpressionTransform(substVisitor)) {
             modified = true;
         }
-        
+
         // Update equivalence class map.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
@@ -194,7 +185,7 @@
                 // Update equivalence class map.
                 LogicalVariable lhs = assignOp.getVariables().get(i);
                 updateEquivalenceClassMap(lhs, exprRef, exprRef.getValue(), op);
-                
+
                 // Update equivalence class map with original assign expression.
                 updateEquivalenceClassMap(lhs, exprRef, originalAssignExprs.get(i), op);
             }
@@ -225,35 +216,30 @@
     }
 
     private class CommonExpressionSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
-                
-        private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-        private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+
         private IOptimizationContext context;
-        private ILogicalOperator op;        
-        
+        private ILogicalOperator op;
+
         public void setContext(IOptimizationContext context) {
             this.context = context;
         }
-        
+
         public void setOperator(ILogicalOperator op) throws AlgebricksException {
             this.op = op;
-            liveVars.clear();
-            usedVars.clear();
         }
-        
+
         @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
-            if (liveVars.isEmpty() && usedVars.isEmpty()) {
-                VariableUtilities.getLiveVariables(op, liveVars);
-                VariableUtilities.getUsedVariables(op, usedVars);
-            }
-            
             AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
             boolean modified = false;
             ExprEquivalenceClass exprEqClass = exprEqClassMap.get(expr);
             if (exprEqClass != null) {
                 // Replace common subexpression with existing variable. 
                 if (exprEqClass.variableIsSet()) {
+                    Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+                    List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+                    VariableUtilities.getLiveVariables(op, liveVars);
+                    VariableUtilities.getUsedVariables(op, usedVars);
                     // Check if the replacing variable is live at this op.
                     // However, if the op is already using variables that are not live, then a replacement may enable fixing the plan.
                     // This behavior is necessary to, e.g., properly deal with distinct by.
@@ -266,9 +252,15 @@
                     }
                 } else {
                     if (assignCommonExpression(exprEqClass, expr)) {
-                        exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
-                        // Do not descend into children since this expr has been completely replaced.
-                        return true;
+                        //re-obtain the live vars after rewriting in the method called in the if condition
+                        Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+                        VariableUtilities.getLiveVariables(op, liveVars);
+                        //rewrite only when the variable is live
+                        if (liveVars.contains(exprEqClass.getVariable())) {
+                            exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                            // Do not descend into children since this expr has been completely replaced.
+                            return true;
+                        }
                     }
                 }
             } else {
@@ -278,7 +270,7 @@
                     exprEqClassMap.put(expr, exprEqClass);
                 }
             }
-            
+
             // Descend into function arguments.
             if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                 AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
@@ -290,17 +282,19 @@
             }
             return modified;
         }
-        
-        private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr) throws AlgebricksException {
+
+        private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr)
+                throws AlgebricksException {
             AbstractLogicalOperator firstOp = (AbstractLogicalOperator) exprEqClass.getFirstOperator();
             Mutable<ILogicalExpression> firstExprRef = exprEqClass.getFirstExpression();
-            if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                    || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
                 // Do not extract common expressions from within the same join operator.
                 if (firstOp == op) {
                     return false;
                 }
                 AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) firstOp;
-                Mutable<ILogicalExpression> joinCond = joinOp.getCondition();                
+                Mutable<ILogicalExpression> joinCond = joinOp.getCondition();
                 ILogicalExpression enclosingExpr = getEnclosingExpression(joinCond, firstExprRef.getValue());
                 if (enclosingExpr == null) {
                     // No viable enclosing expression that we can pull out from the join.
@@ -312,12 +306,13 @@
                 op.getInputs().get(0).setValue(selectOp);
                 // Set firstOp to be the select below op, since we want to assign the common subexpr there.
                 firstOp = (AbstractLogicalOperator) selectOp;
-            } else if (firstOp.getInputs().size() > 1) { 
+            } else if (firstOp.getInputs().size() > 1) {
                 // Bail for any non-join operator with multiple inputs.
                 return false;
-            }                        
+            }
             LogicalVariable newVar = context.newVar();
-            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));            
+            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef
+                    .getValue().cloneExpression()));
             // Place assign below firstOp.
             newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
             newAssign.setExecutionMode(firstOp.getExecutionMode());
@@ -330,7 +325,8 @@
             return true;
         }
 
-        private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef, ILogicalExpression commonSubExpr) {
+        private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef,
+                ILogicalExpression commonSubExpr) {
             ILogicalExpression conditionExpr = conditionExprRef.getValue();
             if (conditionExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
                 return null;
@@ -371,7 +367,7 @@
             return enclosingBoolExpr;
         }
     }
-    
+
     private boolean containsExpr(ILogicalExpression expr, ILogicalExpression searchExpr) {
         if (expr == searchExpr) {
             return true;
@@ -387,7 +383,7 @@
         }
         return false;
     }
-    
+
     private boolean isEqJoinCondition(ILogicalExpression expr) {
         AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
         if (funcExpr.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.EQ)) {
@@ -400,38 +396,38 @@
         }
         return false;
     }
-    
+
     private final class ExprEquivalenceClass {
         // First operator in which expression is used.
         private final ILogicalOperator firstOp;
-        
+
         // Reference to expression in first op.
         private final Mutable<ILogicalExpression> firstExprRef;
-        
+
         // Variable that this expression has been assigned to.
         private LogicalVariable var;
-        
+
         public ExprEquivalenceClass(ILogicalOperator firstOp, Mutable<ILogicalExpression> firstExprRef) {
             this.firstOp = firstOp;
             this.firstExprRef = firstExprRef;
         }
-        
+
         public ILogicalOperator getFirstOperator() {
             return firstOp;
         }
-        
+
         public Mutable<ILogicalExpression> getFirstExpression() {
             return firstExprRef;
         }
-        
+
         public void setVariable(LogicalVariable var) {
             this.var = var;
         }
-        
+
         public LogicalVariable getVariable() {
             return var;
         }
-        
+
         public boolean variableIsSet() {
             return var != null;
         }
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index e54373f..b8cd953 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -40,6 +40,7 @@
 
     private VLongWritable vid = new VLongWritable();
     private GraphMutationVertex newVertex = null;
+    private DoubleWritable msg = new DoubleWritable(0.0);
 
     @Override
     public void compute(Iterator<DoubleWritable> msgIterator) {
@@ -47,17 +48,20 @@
             if (newVertex == null) {
                 newVertex = new GraphMutationVertex();
             }
-            if (getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0) {
-                deleteVertex(getVertexId());
-            } else {
-                vid.set(100 * getVertexId().get());
-                newVertex.setVertexId(vid);
-                newVertex.setVertexValue(getVertexValue());
-                addVertex(vid, newVertex);
+            if (getVertexId().get() < 100) {
+                if ((getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0)) {
+                    deleteVertex(getVertexId());
+                } else {
+                    vid.set(100 * getVertexId().get());
+                    newVertex.setVertexId(vid);
+                    newVertex.setVertexValue(getVertexValue());
+                    addVertex(vid, newVertex);
+                    sendMsg(vid, msg);
+                }
             }
             voteToHalt();
         } else {
-            if (getVertexId().get() % 190 == 0) {
+            if (getVertexId().get() == 1900) {
                 deleteVertex(getVertexId());
             }
             voteToHalt();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 00f6f54..7126e8c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -19,6 +19,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,17 +94,18 @@
     @Test
     public void test() throws Exception {
         setUp();
-        Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
+        Plan[] plans = new Plan[] { Plan.OUTER_JOIN_SORT, Plan.OUTER_JOIN, Plan.INNER_JOIN, Plan.OUTER_JOIN_SINGLE_SORT };
         for (Plan plan : plans) {
             driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
                     PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
+            compareResults();
         }
-        compareResults();
         tearDown();
         waitawhile();
     }
 
     private void compareResults() throws Exception {
+        FileUtils.deleteQuietly(new File(resultFileDir));
         dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(resultFileDir));
         TestUtils.compareWithResultDir(new File(expectedFileDir), new File(resultFileDir));
     }