Merged hyracks_asterix_stabilization -r 2355:2733

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2736 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 165fccd..6701385 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,4 +89,9 @@
     public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
 
     public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
+    
+    /**
+     * Indicates whether the expressions used by this operator must be variable reference expressions.
+     */
+    public boolean requiresVariableReferenceExpressions();
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 468d25c..0aa1ff6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -51,6 +51,8 @@
      * returns true if op1 and op2 have already been compared
      */
     public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+    
+    public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
 
     public abstract void addNotToBeInlinedVar(LogicalVariable var);
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
index bf3f82b..b284b22 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
@@ -39,9 +39,11 @@
 
     @Override
     public ScalarFunctionCallExpression cloneExpression() {
-        cloneAnnotations();
         List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
-        return new ScalarFunctionCallExpression(finfo, clonedArgs);
+        ScalarFunctionCallExpression funcExpr = new ScalarFunctionCallExpression(finfo, clonedArgs);
+        funcExpr.getAnnotations().putAll(cloneAnnotations());
+        funcExpr.setOpaqueParameters(this.getOpaqueParameters());
+        return funcExpr;
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
index 71932d8..652f9b0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
@@ -45,6 +45,7 @@
         List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
         UnnestingFunctionCallExpression ufce = new UnnestingFunctionCallExpression(finfo, clonedArgs);
         ufce.setReturnsUniqueValues(returnsUniqueValues);
+        ufce.setOpaqueParameters(this.getOpaqueParameters());
         return ufce;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index dc0edfe..64dbdef 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -182,4 +182,9 @@
         return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
                 ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
     }
+    
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index a4dc2e0..4543997 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -89,4 +89,8 @@
         return env;
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index 3227f3d..3c21c8f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -106,5 +106,4 @@
         }
         return env;
     }
-
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
index 20aa574..03bfcba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
@@ -81,4 +81,8 @@
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index ee0dcf6..b1da831 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;

 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

@@ -49,12 +50,34 @@
 

     @Override

     public void recomputeSchema() {

-        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());

+        if (schema == null) {

+            schema = new ArrayList<LogicalVariable>();

+        }

+        schema.clear();

+        for (Mutable<ILogicalExpression> eRef : expressions) {

+            ILogicalExpression e = eRef.getValue();

+            if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression v = (VariableReferenceExpression) e;

+                schema.add(v.getVariableReference());

+            }

+        }

     }

 

     @Override

     public VariablePropagationPolicy getVariablePropagationPolicy() {

-        return VariablePropagationPolicy.ALL;

+        return new VariablePropagationPolicy() {

+            @Override

+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)

+                    throws AlgebricksException {

+                for (Mutable<ILogicalExpression> eRef : expressions) {

+                    ILogicalExpression e = eRef.getValue();

+                    if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                        VariableReferenceExpression v = (VariableReferenceExpression) e;

+                        target.addVariable(v.getVariableReference());

+                    }

+                }

+            }

+        };

     }

 

     @Override

@@ -105,7 +128,16 @@
 

     @Override

     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {

-        return createPropagatingAllInputsTypeEnvironment(ctx);

+        IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());

+        IVariableTypeEnvironment childEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());

+        for (Mutable<ILogicalExpression> exprRef : expressions) {

+            ILogicalExpression expr = exprRef.getValue();

+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

+                env.setVarType(varRefExpr.getVariableReference(), childEnv.getType(expr));

+            }

+        }

+        return env;

     }

 

 }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
index 101b6f5..5aa858f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
@@ -51,7 +51,7 @@
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
-        return false;
+        return delegate.acceptExpressionTransform(transform);
     }
 
     @Override
@@ -108,4 +108,13 @@
         return this.createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+    
+    public IOperatorExtension getDelegate() {
+        return delegate;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
index 98c3301..0a80337 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
 
+import java.util.Collection;
 import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -42,5 +43,8 @@
     void setPhysicalOperator(IPhysicalOperator physicalOperator);
 
     ExecutionMode getExecutionMode();
-
+    
+    public void getUsedVariables(Collection<LogicalVariable> usedVars);
+    
+    public void getProducedVariables(Collection<LogicalVariable> producedVars);
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
index 3c6a699..8e67ed9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
@@ -109,5 +109,4 @@
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
index 8c611b0..fda920a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
@@ -102,4 +102,8 @@
         return env;
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 994c6cb..78b6801 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -249,6 +249,7 @@
 

     @Override

     public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        op.getDelegate().getProducedVariables(producedVariables);

         return null;

     }

 }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 9295179..a759e35 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -85,7 +85,13 @@
 

     @Override

     public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {

-        standardLayout(op);

+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

+            ILogicalExpression expr = exprRef.getValue();

+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

+                schemaVariables.add(varRefExpr.getVariableReference());

+            }

+        }

         return null;

     }

 

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3a82ccd..0ea9367 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,13 +51,17 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

 public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {

@@ -106,8 +112,49 @@
     }

 

     @Override

-    public Void visitExchangeOperator(ExchangeOperator op, Void arg) {

-        // does not use any variable

+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {

+        // Used variables depend on the physical operator.

+        if (op.getPhysicalOperator() != null) {

+            IPhysicalOperator physOp = op.getPhysicalOperator();

+            switch (physOp.getOperatorTag()) {

+                case BROADCAST_EXCHANGE:

+                case ONE_TO_ONE_EXCHANGE:

+                case RANDOM_MERGE_EXCHANGE: {

+                    // No variables used.

+                    break;

+                }

+                case HASH_PARTITION_EXCHANGE: {

+                    HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getHashFields());

+                    break;

+                }

+                case HASH_PARTITION_MERGE_EXCHANGE: {

+                    HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getPartitionFields());

+                    for (OrderColumn orderCol : concreteOp.getOrderColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case SORT_MERGE_EXCHANGE: {

+                    SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;

+                    for (OrderColumn orderCol : concreteOp.getSortColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case RANGE_PARTITION_EXCHANGE: {

+                    RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;

+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {

+                        usedVariables.add(partCol.getColumn());

+                    }

+                    break;

+                }

+                default: {

+                    throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");

+                }

+            }

+        }

         return null;

     }

 

@@ -297,6 +344,7 @@
 

     @Override

     public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        op.getDelegate().getUsedVariables(usedVariables);

         return null;

     }

 

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 25f22e7..f66f99b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -61,14 +61,22 @@
             ITypingContext ctx) throws AlgebricksException {

         substituteVariables(op, v1, v2, true, ctx);

     }

-

+    

+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,

+            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {

+        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {

+            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);

+        }

+        substituteVariables(op, v1, v2, true, ctx);

+    }

+    

     public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,

             boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {

         ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(

                 goThroughNts, ctx);

         op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));

     }

-

+    

     public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {

         Set<T> varSet = new HashSet<T>();

         Set<T> varArgSet = new HashSet<T>();

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index f3c9e5a..61d4880 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -158,5 +158,13 @@
                 comparatorFactories);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
+    
+    public List<LogicalVariable> getPartitionFields() {
+        return partitionFields;
+    }
+    
+    public List<OrderColumn> getOrderColumns() {
+        return orderColumns;
+    }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 7e3c935..8875f6c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -69,5 +70,9 @@
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         throw new NotImplementedException();
     }
+    
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 7c63e01..738fc7f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -135,6 +135,7 @@
     /*
      * returns true if op1 and op2 have already been compared
      */
+    @Override
     public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
         HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
         if (ops == null) {
@@ -151,6 +152,11 @@
             }
         }
     }
+    
+    @Override
+    public void removeFromAlreadyCompared(ILogicalOperator op1) {
+        alreadyCompared.remove(op1);
+    }
 
     public void addNotToBeInlinedVar(LogicalVariable var) {
         notToBeInlinedVars.add(var);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
index 4f6699a..3daf85d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
@@ -69,7 +69,7 @@
         VariableUtilities.getUsedVariables(op, varsUsedInUnnest);
 
         HashSet<LogicalVariable> producedInSubplan = new HashSet<LogicalVariable>();
-        VariableUtilities.getLiveVariables(subplan, producedInSubplan);
+        VariableUtilities.getProducedVariables(subplan, producedInSubplan);
 
         if (!producedInSubplan.containsAll(varsUsedInUnnest)) {
             return false;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index 4521d1a..fa5000e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -149,9 +149,15 @@
             // Trivially joinable.
             return true;
         }
-        if (!belowSecondUnnest && op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-            // Bail on subplan.
-            return false;
+        if (!belowSecondUnnest) {
+            // Bail on the following operators.
+            switch (op.getOperatorTag()) {
+                case AGGREGATE:
+                case SUBPLAN:
+                case GROUP:
+                case UNNEST_MAP:
+                    return false;
+            }
         }
         switch (op.getOperatorTag()) {
             case UNNEST:
@@ -211,7 +217,8 @@
                 for (LogicalVariable producedVar : producedVars) {
                     if (outerUsedVars.contains(producedVar)) {
                         outerMatches++;
-                    } else if (innerUsedVars.contains(producedVar)) {
+                    }
+                    if (innerUsedVars.contains(producedVar)) {
                         innerMatches++;
                     }
                 }
@@ -221,24 +228,30 @@
                     // All produced vars used by outer partition.
                     outerOps.add(op);
                     targetUsedVars = outerUsedVars;
-                } else if (innerMatches == producedVars.size() && !producedVars.isEmpty()) {
+                }
+                if (innerMatches == producedVars.size() && !producedVars.isEmpty()) {
                     // All produced vars used by inner partition.
                     innerOps.add(op);
                     targetUsedVars = innerUsedVars;
-                } else if (innerMatches == 0 && outerMatches == 0) {
+                }
+                if (innerMatches == 0 && outerMatches == 0) {
                     // Op produces variables that are not used in the part of the plan we've seen (or it doesn't produce any vars).
                     // Try to figure out where it belongs by analyzing the used variables.
                     List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
                     VariableUtilities.getUsedVariables(op, usedVars);
                     for (LogicalVariable usedVar : usedVars) {
+                        boolean canBreak = false;
                         if (outerUsedVars.contains(usedVar)) {
                             outerOps.add(op);
                             targetUsedVars = outerUsedVars;
-                            break;
+                            canBreak = true;
                         }
                         if (innerUsedVars.contains(usedVar)) {
                             innerOps.add(op);
                             targetUsedVars = innerUsedVars;
+                            canBreak = true;
+                        }
+                        if (canBreak) {
                             break;
                         }
                     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
new file mode 100644
index 0000000..cbe2b4a
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Factors out common sub-expressions by assigning them to a variables, and replacing the common sub-expressions with references to those variables.
+ *
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan. This rule ignores variable reference expressions and constants (other rules deal with those separately).
+ * 
+ * Postconditions/Examples:
+ * Plan with extracted sub-expressions. Generates one assign operator per extracted expression.
+ * 
+ * Example 1 - Simple Arithmetic Example (simplified)
+ * 
+ * Before plan:
+ * assign [$$1] <- [5 + 6 - 10]
+ *   assign [$$0] <- [5 + 6 + 30]
+ * 
+ * After plan:
+ * assign [$$1] <- [$$5 - 10]
+ *   assign [$$0] <- [$$5 + 30]
+ *     assign [$$5] <- [5 + 6]
+ * 
+ * Example 2 - Cleaning up 'Distinct By' (simplified)
+ * 
+ * Before plan: (notice how $$0 is not live after the distinct)
+ * assign [$$3] <- [field-access($$0, 1)]
+ *   distinct ([%0->$$5])
+ *     assign [$$5] <- [field-access($$0, 1)]
+ *       unnest $$0 <- [scan-dataset]
+ * 
+ * After plan: (notice how the issue of $$0 is fixed)
+ * assign [$$3] <- [$$5]
+ *   distinct ([$$5])
+ *     assign [$$5] <- [field-access($$0, 1)]
+ *       unnest $$0 <- [scan-dataset]
+ * 
+ * Example 3 - Pulling Common Expressions Above Joins (simplified)
+ * 
+ * Before plan:
+ * assign [$$9] <- funcZ(funcY($$8))
+ *   join (funcX(funcY($$8)))
+ * 
+ * After plan:
+ * assign [$$9] <- funcZ($$10))
+ *   select (funcX($$10))
+ *     assign [$$10] <- [funcY($$8)]
+ *       join (TRUE)
+ */
+public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
+
+    private final CommonExpressionSubstitutionVisitor substVisitor = new CommonExpressionSubstitutionVisitor();
+    private final Map<ILogicalExpression, ExprEquivalenceClass> exprEqClassMap = new HashMap<ILogicalExpression, ExprEquivalenceClass>();
+    
+    // Set of operators for which common subexpression elimination should not be performed.
+    private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>();
+    static {
+        ignoreOps.add(LogicalOperatorTag.UNNEST);
+        ignoreOps.add(LogicalOperatorTag.UNNEST_MAP);
+        ignoreOps.add(LogicalOperatorTag.ORDER);
+        ignoreOps.add(LogicalOperatorTag.PROJECT);
+        ignoreOps.add(LogicalOperatorTag.AGGREGATE);
+        ignoreOps.add(LogicalOperatorTag.RUNNINGAGGREGATE);
+    }
+    
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        exprEqClassMap.clear();
+        substVisitor.setContext(context);
+        boolean modified = removeCommonExpressions(opRef, context);
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+        }
+        return modified;
+    }
+
+    private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef, ILogicalOperator op) {
+        ExprEquivalenceClass exprEqClass = exprEqClassMap.get(rhsExprRef.getValue());
+        if (exprEqClass == null) {
+            exprEqClass = new ExprEquivalenceClass(op, rhsExprRef);
+            exprEqClassMap.put(rhsExprRef.getValue(), exprEqClass);
+        }
+        exprEqClass.setVariable(lhs);
+    }
+
+    private boolean removeCommonExpressions(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, opRef.getValue())) {
+            return false;
+        }
+        
+        boolean modified = false;
+        // Recurse into children.
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            if (removeCommonExpressions(inputOpRef, context)) {
+                modified = true;
+            }
+        }
+        
+        // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement
+        // (the resulting new variables should be assigned live below a replicate).
+        if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+            exprEqClassMap.clear();
+            return modified;
+        }
+        // Exclude these operators.
+        if (ignoreOps.contains(op.getOperatorTag())) {
+            return modified;
+        }
+        
+        // Perform common subexpression elimination.
+        substVisitor.setOperator(op);
+        if (op.acceptExpressionTransform(substVisitor)) {
+            modified = true;
+        }
+        
+        // Update equivalence class map.
+        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            AssignOperator assignOp = (AssignOperator) op;
+            int numVars = assignOp.getVariables().size();
+            for (int i = 0; i < numVars; i++) {
+                Mutable<ILogicalExpression> exprRef = assignOp.getExpressions().get(i);
+                ILogicalExpression expr = exprRef.getValue();
+                if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE
+                        || expr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                    continue;
+                }
+                // Update equivalence class map.
+                LogicalVariable lhs = assignOp.getVariables().get(i);
+                updateEquivalenceClassMap(lhs, exprRef, op);
+            }
+        }
+
+        // TODO: For now do not perform replacement in nested plans
+        // due to the complication of figuring out whether the firstOp in an equivalence class is within a subplan, 
+        // and the resulting variable will not be visible to the outside.
+        // Since subplans should be eliminated in most cases, this behavior is acceptable for now.
+        /*
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans opWithNestedPlan = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan nestedPlan : opWithNestedPlan.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
+                    if (removeCommonExpressions(rootRef, context)) {
+                        modified = true;
+                    }
+                }
+            }
+        }
+        */
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            context.addToDontApplySet(this, op);
+        }
+        return modified;
+    }
+
+    private class CommonExpressionSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
+                
+        private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+        private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+        private IOptimizationContext context;
+        private ILogicalOperator op;        
+        
+        public void setContext(IOptimizationContext context) {
+            this.context = context;
+        }
+        
+        public void setOperator(ILogicalOperator op) throws AlgebricksException {
+            this.op = op;
+            liveVars.clear();
+            usedVars.clear();
+        }
+        
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            if (liveVars.isEmpty() && usedVars.isEmpty()) {
+                VariableUtilities.getLiveVariables(op, liveVars);
+                VariableUtilities.getUsedVariables(op, usedVars);
+            }
+            
+            AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+            boolean modified = false;
+            ExprEquivalenceClass exprEqClass = exprEqClassMap.get(expr);
+            if (exprEqClass != null) {
+                // Replace common subexpression with existing variable. 
+                if (exprEqClass.variableIsSet()) {
+                    // Check if the replacing variable is live at this op.
+                    // However, if the op is already using variables that are not live, then a replacement may enable fixing the plan.
+                    // This behavior is necessary to, e.g., properly deal with distinct by.
+                    // Also just replace the expr if we are replacing common exprs from within the same operator.
+                    if (liveVars.contains(exprEqClass.getVariable()) || !liveVars.containsAll(usedVars)
+                            || op == exprEqClass.getFirstOperator()) {
+                        exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                        // Do not descend into children since this expr has been completely replaced.
+                        return true;
+                    }
+                } else {
+                    if (assignCommonExpression(exprEqClass, expr)) {
+                        exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                        // Do not descend into children since this expr has been completely replaced.
+                        return true;
+                    }
+                }
+            } else {
+                if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE
+                        && expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                    exprEqClass = new ExprEquivalenceClass(op, exprRef);
+                    exprEqClassMap.put(expr, exprEqClass);
+                }
+            }
+            
+            // Descend into function arguments.
+            if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+                for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+                    if (transform(arg)) {
+                        modified = true;
+                    }
+                }
+            }
+            return modified;
+        }
+        
+        private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr) throws AlgebricksException {
+            AbstractLogicalOperator firstOp = (AbstractLogicalOperator) exprEqClass.getFirstOperator();
+            Mutable<ILogicalExpression> firstExprRef = exprEqClass.getFirstExpression();
+            if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+                // Do not extract common expressions from within the same join operator.
+                if (firstOp == op) {
+                    return false;
+                }
+                AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) firstOp;
+                Mutable<ILogicalExpression> joinCond = joinOp.getCondition();                
+                ILogicalExpression enclosingExpr = getEnclosingExpression(joinCond, firstExprRef.getValue());
+                if (enclosingExpr == null) {
+                    // No viable enclosing expression that we can pull out from the join.
+                    return false;
+                }
+                // Place a Select operator beneath op that contains the enclosing expression.
+                SelectOperator selectOp = new SelectOperator(new MutableObject<ILogicalExpression>(enclosingExpr));
+                selectOp.getInputs().add(new MutableObject<ILogicalOperator>(op.getInputs().get(0).getValue()));
+                op.getInputs().get(0).setValue(selectOp);
+                // Set firstOp to be the select below op, since we want to assign the common subexpr there.
+                firstOp = (AbstractLogicalOperator) selectOp;
+            } else if (firstOp.getInputs().size() > 1) { 
+                // Bail for any non-join operator with multiple inputs.
+                return false;
+            }                        
+            LogicalVariable newVar = context.newVar();
+            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));            
+            // Place assign below firstOp.
+            newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
+            newAssign.setExecutionMode(firstOp.getExecutionMode());
+            firstOp.getInputs().get(0).setValue(newAssign);
+            // Replace original expr with variable reference, and set var in expression equivalence class.
+            firstExprRef.setValue(new VariableReferenceExpression(newVar));
+            exprEqClass.setVariable(newVar);
+            context.computeAndSetTypeEnvironmentForOperator(newAssign);
+            context.computeAndSetTypeEnvironmentForOperator(firstOp);
+            return true;
+        }
+
+        private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef, ILogicalExpression commonSubExpr) {
+            ILogicalExpression conditionExpr = conditionExprRef.getValue();
+            if (conditionExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                return null;
+            }
+            if (isEqJoinCondition(commonSubExpr)) {
+                // Do not eliminate the common expression if we could use it for an equi-join.
+                return null;
+            }
+            AbstractFunctionCallExpression conditionFuncExpr = (AbstractFunctionCallExpression) conditionExpr;
+            // Boolean expression that encloses the common subexpression.
+            ILogicalExpression enclosingBoolExpr = null;
+            // We are not dealing with arbitrarily nested and/or expressions here.
+            FunctionIdentifier funcIdent = conditionFuncExpr.getFunctionIdentifier();
+            if (funcIdent.equals(AlgebricksBuiltinFunctions.AND) || funcIdent.equals(AlgebricksBuiltinFunctions.OR)) {
+                Iterator<Mutable<ILogicalExpression>> argIter = conditionFuncExpr.getArguments().iterator();
+                while (argIter.hasNext()) {
+                    Mutable<ILogicalExpression> argRef = argIter.next();
+                    if (containsExpr(argRef.getValue(), commonSubExpr)) {
+                        enclosingBoolExpr = argRef.getValue();
+                        // Remove the enclosing expression from the argument list.
+                        // We are going to pull it out into a new select operator.
+                        argIter.remove();
+                        break;
+                    }
+                }
+                // If and/or only has a single argument left, pull it out and remove the and/or function.
+                if (conditionFuncExpr.getArguments().size() == 1) {
+                    conditionExprRef.setValue(conditionFuncExpr.getArguments().get(0).getValue());
+                }
+            } else {
+                if (!containsExpr(conditionExprRef.getValue(), commonSubExpr)) {
+                    return null;
+                }
+                enclosingBoolExpr = conditionFuncExpr;
+                // Replace the enclosing expression with TRUE.
+                conditionExprRef.setValue(ConstantExpression.TRUE);
+            }
+            return enclosingBoolExpr;
+        }
+    }
+    
+    private boolean containsExpr(ILogicalExpression expr, ILogicalExpression searchExpr) {
+        if (expr == searchExpr) {
+            return true;
+        }
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        for (Mutable<ILogicalExpression> argRef : funcExpr.getArguments()) {
+            if (containsExpr(argRef.getValue(), searchExpr)) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private boolean isEqJoinCondition(ILogicalExpression expr) {
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        if (funcExpr.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.EQ)) {
+            ILogicalExpression arg1 = funcExpr.getArguments().get(0).getValue();
+            ILogicalExpression arg2 = funcExpr.getArguments().get(1).getValue();
+            if (arg1.getExpressionTag() == LogicalExpressionTag.VARIABLE
+                    && arg2.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private final class ExprEquivalenceClass {
+        // First operator in which expression is used.
+        private final ILogicalOperator firstOp;
+        
+        // Reference to expression in first op.
+        private final Mutable<ILogicalExpression> firstExprRef;
+        
+        // Variable that this expression has been assigned to.
+        private LogicalVariable var;
+        
+        public ExprEquivalenceClass(ILogicalOperator firstOp, Mutable<ILogicalExpression> firstExprRef) {
+            this.firstOp = firstOp;
+            this.firstExprRef = firstExprRef;
+        }
+        
+        public ILogicalOperator getFirstOperator() {
+            return firstOp;
+        }
+        
+        public Mutable<ILogicalExpression> getFirstExpression() {
+            return firstExprRef;
+        }
+        
+        public void setVariable(LogicalVariable var) {
+            this.var = var;
+        }
+        
+        public LogicalVariable getVariable() {
+            return var;
+        }
+        
+        public boolean variableIsSet() {
+            return var != null;
+        }
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
new file mode 100644
index 0000000..df8ddda
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+/**
+ * Inlines variables that are referenced exactly once.
+ * 
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan.
+ * 
+ * Example assuming variable $$3 is referenced exactly once.
+ * 
+ * Before plan:
+ * select (funcA($$3))
+ *   ...
+ *     assign [$$3] <- [field-access($$0, 1)]
+ * 
+ * After plan:
+ * select (funcA(field-access($$0, 1))
+ *   ...
+ *     assign [] <- []
+ */
+public class InlineSingleReferenceVariablesRule extends InlineVariablesRule {
+
+    // Maps from variable to a list of operators using that variable.
+    protected Map<LogicalVariable, List<ILogicalOperator>> usedVarsMap = new HashMap<LogicalVariable, List<ILogicalOperator>>();
+    protected List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    
+    @Override
+    protected void prepare(IOptimizationContext context) {
+        super.prepare(context);
+        usedVarsMap.clear();
+        usedVars.clear();
+    }
+    
+    @Override
+    protected boolean performFinalAction() throws AlgebricksException {
+        boolean modified = false;
+        for (Map.Entry<LogicalVariable, List<ILogicalOperator>> entry : usedVarsMap.entrySet()) {
+            // Perform replacement only if variable is referenced a single time.
+            if (entry.getValue().size() == 1) {
+                ILogicalOperator op = entry.getValue().get(0);
+                if (!op.requiresVariableReferenceExpressions()) {
+                    inlineVisitor.setOperator(op);
+                    inlineVisitor.setTargetVariable(entry.getKey());
+                    if (op.acceptExpressionTransform(inlineVisitor)) {
+                        modified = true;
+                    }
+                    inlineVisitor.setTargetVariable(null);
+                }         
+            }
+        }
+        return modified;
+    }
+
+    @Override
+    protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+        for (LogicalVariable var : usedVars) {
+            List<ILogicalOperator> opsUsingVar = usedVarsMap.get(var);
+            if (opsUsingVar == null) {
+                opsUsingVar = new ArrayList<ILogicalOperator>();
+                usedVarsMap.put(var, opsUsingVar);
+            }
+            opsUsingVar.add(op);
+        }
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 8a79d81..7fed577a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -15,330 +15,231 @@
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+/**
+ * Replaces variable reference expressions with their assigned function-call expression where applicable
+ * (some variables are generated by datasources).
+ * Inlining variables may enable other optimizations by allowing selects and assigns to be moved
+ * (e.g., a select may be pushed into a join to enable an efficient physical join operator).
+ * 
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call 
+ * (i.e., this rule ignores right-hand side constants and other variable references expressions  
+ * 
+ * Postconditions/Examples:
+ * All qualifying variables have been inlined.
+ * 
+ * Example (simplified):
+ * 
+ * Before plan:
+ * select <- [$$1 < $$2 + $$0]
+ *   assign [$$2] <- [funcZ() + $$0]
+ *     assign [$$0, $$1] <- [funcX(), funcY()]
+ * 
+ * After plan:
+ * select <- [funcY() < funcZ() + funcX() + funcX()]
+ *   assign [$$2] <- [funcZ() + funcX()]
+ *     assign [$$0, $$1] <- [funcX(), funcY()]
+ */
 public class InlineVariablesRule implements IAlgebraicRewriteRule {
 
+    // Map of variables that could be replaced by their producing expression.
+    // Populated during the top-down sweep of the plan.
+    protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<LogicalVariable, ILogicalExpression>();
+
+    // Visitor for replacing variable reference expressions with their originating expression.
+    protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
+    
+    // Set of FunctionIdentifiers that we should not inline.
+    protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
+    
+    protected boolean hasRun = false;
+    
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
     }
 
     @Override
-    /**
-     * 
-     * Does one big DFS sweep over the plan.
-     * 
-     */
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
-        VariableSubstitutionVisitor substVisitor = new VariableSubstitutionVisitor(false);
-        VariableSubstitutionVisitor substVisitorForWrites = new VariableSubstitutionVisitor(true);
-        substVisitor.setContext(context);
-        substVisitorForWrites.setContext(context);
-        Pair<Boolean, Boolean> bb = collectEqClassesAndRemoveRedundantOps(opRef, context, true,
-                new LinkedList<EquivalenceClass>(), substVisitor, substVisitorForWrites);
-        return bb.first;
+        prepare(context);
+        boolean modified = inlineVariables(opRef, context);
+        if (performFinalAction()) {
+            modified = true;
+        }
+        hasRun = true;
+        return modified;
+    }
+    
+    protected void prepare(IOptimizationContext context) {
+        varAssignRhs.clear();
+        inlineVisitor.setContext(context);
+    }
+    
+    protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
+        // Only inline variables in operators that can deal with arbitrary expressions.
+        if (!op.requiresVariableReferenceExpressions()) {
+            inlineVisitor.setOperator(op);
+            return op.acceptExpressionTransform(inlineVisitor);
+        }
+        return false;
     }
 
-    private Pair<Boolean, Boolean> collectEqClassesAndRemoveRedundantOps(Mutable<ILogicalOperator> opRef,
-            IOptimizationContext context, boolean first, List<EquivalenceClass> equivClasses,
-            VariableSubstitutionVisitor substVisitor, VariableSubstitutionVisitor substVisitorForWrites)
+    protected boolean performFinalAction() throws AlgebricksException {
+        return false;
+    }
+    
+    protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        // if (context.checkIfInDontApplySet(this, opRef.getValue())) {
-        // return false;
-        // }
-        if (op.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
-            return new Pair<Boolean, Boolean>(false, false);
-        }
-        boolean modified = false;
-        boolean ecChange = false;
-        int cnt = 0;
-        for (Mutable<ILogicalOperator> i : op.getInputs()) {
-            boolean isOuterInputBranch = op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN && cnt == 1;
-            List<EquivalenceClass> eqc = isOuterInputBranch ? new LinkedList<EquivalenceClass>() : equivClasses;
-
-            Pair<Boolean, Boolean> bb = (collectEqClassesAndRemoveRedundantOps(i, context, false, eqc, substVisitor,
-                    substVisitorForWrites));
-
-            if (bb.first) {
-                modified = true;
-            }
-            if (bb.second) {
-                ecChange = true;
-            }
-
-            if (isOuterInputBranch) {
-                if (AlgebricksConfig.DEBUG) {
-                    AlgebricksConfig.ALGEBRICKS_LOGGER.finest("--- Equivalence classes for inner branch of outer op.: "
-                            + eqc + "\n");
-                }
-                for (EquivalenceClass ec : eqc) {
-                    if (!ec.representativeIsConst()) {
-                        equivClasses.add(ec);
-                    }
-                }
-            }
-
-            ++cnt;
-        }
-        if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op;
-            List<EquivalenceClass> eqc = equivClasses;
-            if (n.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                eqc = new LinkedList<EquivalenceClass>();
-            } else {
-                eqc = equivClasses;
-            }
-            for (ILogicalPlan p : n.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                    Pair<Boolean, Boolean> bb = collectEqClassesAndRemoveRedundantOps(r, context, false, eqc,
-                            substVisitor, substVisitorForWrites);
-                    if (bb.first) {
-                        modified = true;
-                    }
-                    if (bb.second) {
-                        ecChange = true;
-                    }
-                }
-            }
-        }
-        // we assume a variable is assigned a value only once
+        
+        // Update mapping from variables to expressions during top-down traversal.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-            AssignOperator a = (AssignOperator) op;
-            ILogicalExpression rhs = a.getExpressions().get(0).getValue();
-            if (rhs.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                LogicalVariable varLeft = a.getVariables().get(0);
-                VariableReferenceExpression varRef = (VariableReferenceExpression) rhs;
-                LogicalVariable varRight = varRef.getVariableReference();
-
-                EquivalenceClass ecRight = findEquivClass(varRight, equivClasses);
-                if (ecRight != null) {
-                    ecRight.addMember(varLeft);
-                } else {
-                    List<LogicalVariable> m = new LinkedList<LogicalVariable>();
-                    m.add(varRight);
-                    m.add(varLeft);
-                    EquivalenceClass ec = new EquivalenceClass(m, varRight);
-                    equivClasses.add(ec);
-                    if (AlgebricksConfig.DEBUG) {
-                        AlgebricksConfig.ALGEBRICKS_LOGGER.finest("--- New equivalence class: " + ec + "\n");
+            AssignOperator assignOp = (AssignOperator) op;
+            List<LogicalVariable> vars = assignOp.getVariables();
+            List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();            
+            for (int i = 0; i < vars.size(); i++) {
+                ILogicalExpression expr = exprs.get(i).getValue();
+                // Ignore functions that are in the doNotInline set.                
+                if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+                    if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+                        continue;
                     }
                 }
-                ecChange = true;
-            } else if (((AbstractLogicalExpression) rhs).getExpressionTag() == LogicalExpressionTag.CONSTANT) {
-                LogicalVariable varLeft = a.getVariables().get(0);
-                List<LogicalVariable> m = new LinkedList<LogicalVariable>();
-                m.add(varLeft);
-                EquivalenceClass ec = new EquivalenceClass(m, (ConstantExpression) rhs);
-                // equivClassesForParent.add(ec);
-                equivClasses.add(ec);
-                ecChange = true;
-            }
-        } else if (op.getOperatorTag() == LogicalOperatorTag.GROUP && !(context.checkIfInDontApplySet(this, op))) {
-            GroupByOperator group = (GroupByOperator) op;
-            Pair<Boolean, Boolean> r1 = processVarExprPairs(group.getGroupByList(), equivClasses);
-            Pair<Boolean, Boolean> r2 = processVarExprPairs(group.getDecorList(), equivClasses);
-            modified = modified || r1.first || r2.first;
-            ecChange = r1.second || r2.second;
-        }
-        if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
-            assignVarsNeededByProject((ProjectOperator) op, equivClasses, context);
-        } else {
-            if (op.getOperatorTag() == LogicalOperatorTag.WRITE) {
-                substVisitorForWrites.setEquivalenceClasses(equivClasses);
-                if (op.acceptExpressionTransform(substVisitorForWrites)) {
-                    modified = true;
-                }
-            } else {
-                substVisitor.setEquivalenceClasses(equivClasses);
-                if (op.acceptExpressionTransform(substVisitor)) {
-                    modified = true;
-                    if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
-                        GroupByOperator group = (GroupByOperator) op;
-                        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gp : group.getGroupByList()) {
-                            if (gp.first != null
-                                    && gp.second.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                LogicalVariable gv = ((VariableReferenceExpression) gp.second.getValue())
-                                        .getVariableReference();
-                                Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = group
-                                        .getDecorList().iterator();
-                                while (iter.hasNext()) {
-                                    Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
-                                    if (dp.first == null
-                                            && dp.second.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                                        LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue())
-                                                .getVariableReference();
-                                        if (dv == gv) {
-                                            // The decor variable is redundant,
-                                            // since it is
-                                            // propagated as a grouping
-                                            // variable.
-                                            EquivalenceClass ec1 = findEquivClass(gv, equivClasses);
-                                            if (ec1 != null) {
-                                                ec1.addMember(gp.first);
-                                                ec1.setVariableRepresentative(gp.first);
-                                            } else {
-                                                List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
-                                                varList.add(gp.first);
-                                                varList.add(gv);
-                                                ec1 = new EquivalenceClass(varList, gp.first);
-                                            }
-                                            iter.remove();
-                                            break;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
+                varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
             }
         }
-        return new Pair<Boolean, Boolean>(modified, ecChange);
-    }
 
-    private Pair<Boolean, Boolean> processVarExprPairs(
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairs, List<EquivalenceClass> equivClasses) {
-        boolean ecFromGroup = false;
+        // Descend into children removing projects on the way.
         boolean modified = false;
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : vePairs) {
-            ILogicalExpression expr = p.second.getValue();
-            if (p.first != null && expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-                LogicalVariable rhsVar = varRef.getVariableReference();
-                ecFromGroup = true;
-                EquivalenceClass ecRight = findEquivClass(rhsVar, equivClasses);
-                if (ecRight != null) {
-                    LogicalVariable replacingVar = ecRight.getVariableRepresentative();
-                    if (replacingVar != null && replacingVar != rhsVar) {
-                        varRef.setVariable(replacingVar);
-                        modified = true;
-                    }
-                }
-            }
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            if (inlineVariables(inputOpRef, context)) {
+                modified = true;
+            }            
         }
-        return new Pair<Boolean, Boolean>(modified, ecFromGroup);
+
+        if (performBottomUpAction(op)) {
+            modified = true;
+        }
+        
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            context.addToDontApplySet(this, op);
+            // Re-enable rules that we may have already tried. They could be applicable now after inlining.
+            context.removeFromAlreadyCompared(opRef.getValue());
+        }
+
+        return modified;
     }
 
-    // Instead of doing this, we could make Projection to be more expressive and
-    // also take constants (or even expression), at the expense of a more
-    // complex project push down.
-    private void assignVarsNeededByProject(ProjectOperator op, List<EquivalenceClass> equivClasses,
-            IOptimizationContext context) throws AlgebricksException {
-        List<LogicalVariable> prVars = op.getVariables();
-        int sz = prVars.size();
-        for (int i = 0; i < sz; i++) {
-            EquivalenceClass ec = findEquivClass(prVars.get(i), equivClasses);
-            if (ec != null) {
-                if (!ec.representativeIsConst()) {
-                    prVars.set(i, ec.getVariableRepresentative());
-                }
-            }
-        }
-    }
-
-    private final static EquivalenceClass findEquivClass(LogicalVariable var, List<EquivalenceClass> equivClasses) {
-        for (EquivalenceClass ec : equivClasses) {
-            if (ec.contains(var)) {
-                return ec;
-            }
-        }
-        return null;
-    }
-
-    private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
-        private List<EquivalenceClass> equivClasses;
+    protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
+        
+        private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
+        private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();        
+        private ILogicalOperator op;
         private IOptimizationContext context;
-        private final boolean doNotSubstWithConst;
-
-        public VariableSubstitutionVisitor(boolean doNotSubstWithConst) {
-            this.doNotSubstWithConst = doNotSubstWithConst;
+        // If set, only replace this variable reference.
+        private LogicalVariable targetVar;
+        
+        public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
+            this.varAssignRhs = varAssignRhs;
         }
-
+        
+        public void setTargetVariable(LogicalVariable targetVar) {
+            this.targetVar = targetVar;
+        }
+        
         public void setContext(IOptimizationContext context) {
             this.context = context;
         }
 
-        public void setEquivalenceClasses(List<EquivalenceClass> equivClasses) {
-            this.equivClasses = equivClasses;
+        public void setOperator(ILogicalOperator op) throws AlgebricksException {
+            this.op = op;
+            liveVars.clear();
         }
-
+        
         @Override
-        public boolean transform(Mutable<ILogicalExpression> exprRef) {
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {            
             ILogicalExpression e = exprRef.getValue();
             switch (((AbstractLogicalExpression) e).getExpressionTag()) {
                 case VARIABLE: {
-                    // look for a required substitution
                     LogicalVariable var = ((VariableReferenceExpression) e).getVariableReference();
+                    // Restrict replacement to targetVar if it has been set.
+                    if (targetVar != null && var != targetVar) {
+                        return false;
+                    }
+                    // Make sure has not been excluded from inlining.
                     if (context.shouldNotBeInlined(var)) {
                         return false;
                     }
-                    EquivalenceClass ec = findEquivClass(var, equivClasses);
-                    if (ec == null) {
+                    ILogicalExpression rhs = varAssignRhs.get(var);
+                    if (rhs == null) {
+                        // Variable was not produced by an assign.
                         return false;
                     }
-                    if (ec.representativeIsConst()) {
-                        if (doNotSubstWithConst) {
-                            return false;
-                        }
-                        exprRef.setValue(ec.getConstRepresentative());
-                        return true;
-                    } else {
-                        LogicalVariable r = ec.getVariableRepresentative();
-                        if (!r.equals(var)) {
-                            exprRef.setValue(new VariableReferenceExpression(r));
-                            return true;
-                        } else {
+                    
+                    // Make sure used variables from rhs are live.
+                    if (liveVars.isEmpty()) {
+                        VariableUtilities.getLiveVariables(op, liveVars);
+                    }
+                    rhsUsedVars.clear();
+                    rhs.getUsedVariables(rhsUsedVars);
+                    for (LogicalVariable rhsUsedVar : rhsUsedVars) {
+                        if (!liveVars.contains(rhsUsedVar)) {
                             return false;
                         }
                     }
+                    
+                    // Replace variable reference with a clone of the rhs expr.
+                    exprRef.setValue(rhs.cloneExpression());
+                    return true;
                 }
                 case FUNCTION_CALL: {
                     AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
-                    boolean m = false;
+                    boolean modified = false;
                     for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
                         if (transform(arg)) {
-                            m = true;
+                            modified = true;
                         }
                     }
-                    return m;
+                    return modified;
                 }
                 default: {
                     return false;
                 }
             }
         }
-
     }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
index d54833e..431fca1 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
@@ -87,6 +87,7 @@
         projectOp.getInputs().add(new MutableObject<ILogicalOperator>(parentOp));
         opUnion.getInputs().get(branch).setValue(projectOp);
         projectOp.setPhysicalOperator(new StreamProjectPOperator());
+        context.computeAndSetTypeEnvironmentForOperator(projectOp);
         context.computeAndSetTypeEnvironmentForOperator(parentOp);
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
new file mode 100644
index 0000000..a057f4f
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Projects away unused variables at the earliest possible point.
+ * Does a full DFS sweep of the plan adding ProjectOperators in the bottom-up pass.
+ * Also, removes projects that have become useless.
+ * TODO: This rule 'recklessly' adds as many projects as possible, but there is no guarantee
+ * that the overall cost of the plan is reduced since project operators also add a cost.
+ */
+public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
+
+    private final Set<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+    private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+    private final Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+    private final List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
+    protected boolean hasRun = false;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
+        hasRun = true;
+        return introduceProjects(null, -1, opRef, Collections.<LogicalVariable> emptySet(), context);
+    }
+
+    protected boolean introduceProjects(AbstractLogicalOperator parentOp, int parentInputIndex,
+            Mutable<ILogicalOperator> opRef, Set<LogicalVariable> parentUsedVars, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+
+        // In the top-down pass, maintain a set of variables that are used in op and all its parents.
+        HashSet<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+        parentsUsedVars.addAll(parentUsedVars);
+        parentsUsedVars.addAll(usedVars);
+
+        // Descend into children.        
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(i);
+            if (introduceProjects(op, i, inputOpRef, parentsUsedVars, context)) {
+                modified = true;
+            }
+        }
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+        // In the bottom-up pass, determine which live variables are not used by op's parents.
+        // Such variables are be projected away.
+        liveVars.clear();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        producedVars.clear();
+        VariableUtilities.getProducedVariables(op, producedVars);
+        liveVars.removeAll(producedVars);
+
+        projectVars.clear();
+        for (LogicalVariable liveVar : liveVars) {
+            if (parentsUsedVars.contains(liveVar)) {
+                projectVars.add(liveVar);
+            }
+        }
+
+        // Some of the variables that are live at this op are not used above.
+        if (projectVars.size() != liveVars.size()) {
+            // Add a project operator under each of op's qualifying input branches.
+            for (int i = 0; i < op.getInputs().size(); i++) {
+                ILogicalOperator childOp = op.getInputs().get(i).getValue();
+                liveVars.clear();
+                VariableUtilities.getLiveVariables(childOp, liveVars);
+                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                vars.addAll(projectVars);
+                // Only retain those variables that are live in the i-th input branch.
+                vars.retainAll(liveVars);
+                if (vars.size() != liveVars.size()) {
+                    ProjectOperator projectOp = new ProjectOperator(vars);
+                    projectOp.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
+                    op.getInputs().get(i).setValue(projectOp);
+                    context.computeAndSetTypeEnvironmentForOperator(projectOp);
+                    modified = true;
+                }
+            }
+        } else if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+            // Check if the existing project has become useless.
+            liveVars.clear();
+            VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
+            ProjectOperator projectOp = (ProjectOperator) op;
+            List<LogicalVariable> projectVars = projectOp.getVariables();
+            if (liveVars.size() == projectVars.size() && liveVars.containsAll(projectVars)) {
+                boolean eliminateProject = true;
+                // For UnionAll the variables must also be in exactly the correct order.
+                if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+                    eliminateProject = canEliminateProjectBelowUnion((UnionAllOperator) parentOp, projectOp,
+                            parentInputIndex);
+                }
+                if (eliminateProject) {
+                    // The existing project has become useless. Remove it.
+                    parentOp.getInputs().get(parentInputIndex).setValue(op.getInputs().get(0).getValue());
+                }
+            }
+        }
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+        return modified;
+    }
+    
+    private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, ProjectOperator projectOp,
+            int unionInputIndex) throws AlgebricksException {
+        List<LogicalVariable> orderedLiveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), orderedLiveVars);
+        int numVars = orderedLiveVars.size();
+        for (int i = 0; i < numVars; i++) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varTriple = unionOp.getVariableMappings().get(i);
+            if (unionInputIndex == 0) {
+                if (varTriple.first != orderedLiveVars.get(i)) {
+                    return false;
+                }
+            } else {
+                if (varTriple.second != orderedLiveVars.get(i)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
index 75862cf..8b4f0a1 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
@@ -38,9 +38,6 @@
 
 public class PullSelectOutOfEqJoin implements IAlgebraicRewriteRule {
 
-    private List<Mutable<ILogicalExpression>> eqVarVarComps = new ArrayList<Mutable<ILogicalExpression>>();
-    private List<Mutable<ILogicalExpression>> otherPredicates = new ArrayList<Mutable<ILogicalExpression>>();
-
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         return false;
@@ -66,8 +63,8 @@
         if (!fi.equals(AlgebricksBuiltinFunctions.AND)) {
             return false;
         }
-        eqVarVarComps.clear();
-        otherPredicates.clear();
+        List<Mutable<ILogicalExpression>> eqVarVarComps = new ArrayList<Mutable<ILogicalExpression>>();
+        List<Mutable<ILogicalExpression>> otherPredicates = new ArrayList<Mutable<ILogicalExpression>>();
         for (Mutable<ILogicalExpression> arg : fexp.getArguments()) {
             if (isEqVarVar(arg.getValue())) {
                 eqVarVarComps.add(arg);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
new file mode 100644
index 0000000..1bcf95a
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -0,0 +1,149 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of 
+ * the UnionAllOperator's branches with appropriate variable replacements.
+ * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator, 
+ * for example, eliminating common sub-expressions.
+ * 
+ * Example:
+ * 
+ * Before plan:
+ * ...
+ * assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
+ *   union ($$1, $$2, $$3) ($$4, $$5, $$6)
+ *     union_branch_0
+ *       ...
+ *     union_branch_1
+ *       ...
+ *     
+ * After plan:
+ * ...
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
+ *   assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
+ *     union_branch_0
+ *       ...
+ *   assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
+ *     union_branch_1
+ *       ...
+ */
+public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (!op.hasInputs()) {
+            return false;
+        }
+
+        boolean modified = false;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+            if (childOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+                continue;
+            }
+            AssignOperator assignOp = (AssignOperator) childOp;
+
+            AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
+            if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+                continue;
+            }
+            UnionAllOperator unionOp = (UnionAllOperator) childOfChildOp;
+
+            Set<LogicalVariable> assignUsedVars = new HashSet<LogicalVariable>();
+            VariableUtilities.getUsedVariables(assignOp, assignUsedVars);
+
+            List<LogicalVariable> assignVars = assignOp.getVariables();
+
+            AssignOperator[] newAssignOps = new AssignOperator[2];
+            for (int j = 0; j < unionOp.getInputs().size(); j++) {
+                newAssignOps[j] = createAssignBelowUnionAllBranch(unionOp, j, assignOp, assignUsedVars, context);
+            }
+            // Add original assign variables to the union variable mappings.
+            for (int j = 0; j < assignVars.size(); j++) {
+                LogicalVariable first = newAssignOps[0].getVariables().get(j);
+                LogicalVariable second = newAssignOps[1].getVariables().get(j);
+                Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(
+                        first, second, assignVars.get(j));
+                unionOp.getVariableMappings().add(varMapping);
+            }
+            context.computeAndSetTypeEnvironmentForOperator(unionOp);
+
+            // Remove original assign operator.
+            op.getInputs().set(i, assignOp.getInputs().get(0));
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            modified = true;
+        }
+
+        return modified;
+    }
+
+    private AssignOperator createAssignBelowUnionAllBranch(UnionAllOperator unionOp, int inputIndex,
+            AssignOperator originalAssignOp, Set<LogicalVariable> assignUsedVars, IOptimizationContext context)
+            throws AlgebricksException {
+        AssignOperator newAssignOp = cloneAssignOperator(originalAssignOp, context);
+        newAssignOp.getInputs()
+                .add(new MutableObject<ILogicalOperator>(unionOp.getInputs().get(inputIndex).getValue()));
+        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
+        unionOp.getInputs().get(inputIndex).setValue(newAssignOp);
+        int numVarMappings = unionOp.getVariableMappings().size();
+        for (int i = 0; i < numVarMappings; i++) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = unionOp.getVariableMappings().get(i);
+            if (assignUsedVars.contains(varMapping.third)) {
+                LogicalVariable replacementVar;
+                if (inputIndex == 0) {
+                    replacementVar = varMapping.first;
+                } else {
+                    replacementVar = varMapping.second;
+                }
+                VariableUtilities.substituteVariables(newAssignOp, varMapping.third, replacementVar, context);
+            }
+        }
+        return newAssignOp;
+    }
+
+    /**
+     * Clones the given assign operator changing the returned variables to be new ones.
+     * Also, leaves the inputs of the clone clear.
+     */
+    private AssignOperator cloneAssignOperator(AssignOperator assignOp, IOptimizationContext context) {
+        List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+        List<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
+        int numVars = assignOp.getVariables().size();
+        for (int i = 0; i < numVars; i++) {
+            vars.add(context.newVar());
+            exprs.add(new MutableObject<ILogicalExpression>(assignOp.getExpressions().get(i).getValue()
+                    .cloneExpression()));
+        }
+        AssignOperator assignCloneOp = new AssignOperator(vars, exprs);
+        assignCloneOp.setExecutionMode(assignOp.getExecutionMode());
+        return assignCloneOp;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
new file mode 100644
index 0000000..16b010e
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes function-call expressions below a join if possible.
+ * Assigns the result of such function-calls expressions to new variables, and replaces the original
+ * expression with a corresponding variable reference expression.
+ * This rule can help reduce the cost of computing expensive functions by pushing them below
+ * a join (which may blow up the cardinality).
+ * Also, this rule may help to enable other rules such as common subexpression elimination, again to reduce
+ * the number of calls to expensive functions.
+ * 
+ * Example: (we are pushing pushMeFunc)
+ * 
+ * Before plan:
+ * assign [$$10] <- [funcA(funcB(pushMeFunc($$3, $$4)))]
+ *   join (some condition) 
+ *     join_branch_0 where $$3 and $$4 are not live
+ *       ...
+ *     join_branch_1 where $$3 and $$4 are live
+ *       ...
+ * 
+ * After plan:
+ * assign [$$10] <- [funcA(funcB($$11))]
+ *   join (some condition) 
+ *     join_branch_0 where $$3 and $$4 are not live
+ *       ...
+ *     join_branch_1 where $$3 and $$4 are live
+ *       assign[$$11] <- [pushMeFunc($$3, $$4)]
+ *         ...
+ */
+public class PushFunctionsBelowJoin implements IAlgebraicRewriteRule {
+
+    private final Set<FunctionIdentifier> toPushFuncIdents;
+    private final List<Mutable<ILogicalExpression>> funcExprs = new ArrayList<Mutable<ILogicalExpression>>();
+    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+
+    public PushFunctionsBelowJoin(Set<FunctionIdentifier> toPushFuncIdents) {
+        this.toPushFuncIdents = toPushFuncIdents;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        AssignOperator assignOp = (AssignOperator) op;
+
+        // Find a join operator below this assign.
+        Mutable<ILogicalOperator> joinOpRef = findJoinOp(assignOp.getInputs().get(0));
+        if (joinOpRef == null) {
+            return false;
+        }
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinOpRef.getValue();
+
+        // Check if the assign uses a function that we wish to push below the join if possible.
+        funcExprs.clear();
+        gatherFunctionCalls(assignOp, funcExprs);
+        if (funcExprs.isEmpty()) {
+            return false;
+        }
+
+        // Try to push the functions down the input branches of the join.
+        boolean modified = false;
+        if (pushDownFunctions(joinOp, 0, funcExprs, context)) {
+            modified = true;
+        }
+        if (pushDownFunctions(joinOp, 1, funcExprs, context)) {
+            modified = true;
+        }
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(joinOp);
+        }
+        return modified;
+    }
+
+    private Mutable<ILogicalOperator> findJoinOp(Mutable<ILogicalOperator> opRef) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        switch (op.getOperatorTag()) {
+            case INNERJOIN:
+            case LEFTOUTERJOIN: {
+                return opRef;
+            }
+            // Bail on these operators.
+            case GROUP:
+            case AGGREGATE:
+            case DISTINCT:
+            case UNNEST_MAP: {
+                return null;
+            }
+            // Traverse children.
+            default: {
+                for (Mutable<ILogicalOperator> childOpRef : op.getInputs()) {
+                    return findJoinOp(childOpRef);
+                }
+            }
+        }
+        return null;
+    }
+
+    private void gatherFunctionCalls(AssignOperator assignOp, List<Mutable<ILogicalExpression>> funcExprs) {
+        for (Mutable<ILogicalExpression> exprRef : assignOp.getExpressions()) {
+            gatherFunctionCalls(exprRef, funcExprs);
+        }
+    }
+
+    private void gatherFunctionCalls(Mutable<ILogicalExpression> exprRef, List<Mutable<ILogicalExpression>> funcExprs) {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return;
+        }
+        // Check whether the function is a function we want to push.
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        if (toPushFuncIdents.contains(funcExpr.getFunctionIdentifier())) {
+            funcExprs.add(exprRef);
+        }
+        // Traverse arguments.
+        for (Mutable<ILogicalExpression> funcArg : funcExpr.getArguments()) {
+            gatherFunctionCalls(funcArg, funcExprs);
+        }
+    }
+
+    private boolean pushDownFunctions(AbstractBinaryJoinOperator joinOp, int inputIndex,
+            List<Mutable<ILogicalExpression>> funcExprs, IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator joinInputOp = joinOp.getInputs().get(inputIndex).getValue();
+        liveVars.clear();
+        VariableUtilities.getLiveVariables(joinInputOp, liveVars);
+        Iterator<Mutable<ILogicalExpression>> funcIter = funcExprs.iterator();
+        List<LogicalVariable> assignVars = null;
+        List<Mutable<ILogicalExpression>> assignExprs = null;
+        while (funcIter.hasNext()) {
+            Mutable<ILogicalExpression> funcExprRef = funcIter.next();
+            ILogicalExpression funcExpr = funcExprRef.getValue();
+            usedVars.clear();
+            funcExpr.getUsedVariables(usedVars);
+            // Check if we can push the function down this branch.
+            if (liveVars.containsAll(usedVars)) {
+                if (assignVars == null) {
+                    assignVars = new ArrayList<LogicalVariable>();
+                    assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
+                }
+                // Replace the original expression with a variable reference expression.
+                LogicalVariable replacementVar = context.newVar();
+                assignVars.add(replacementVar);
+                assignExprs.add(new MutableObject<ILogicalExpression>(funcExpr));
+                funcExprRef.setValue(new VariableReferenceExpression(replacementVar));
+                funcIter.remove();
+            }
+        }
+        // Create new assign operator below the join if any functions can be pushed.
+        if (assignVars != null) {
+            AssignOperator newAssign = new AssignOperator(assignVars, assignExprs);
+            newAssign.getInputs().add(new MutableObject<ILogicalOperator>(joinInputOp));
+            newAssign.setExecutionMode(joinOp.getExecutionMode());
+            joinOp.getInputs().get(inputIndex).setValue(newAssign);
+            context.computeAndSetTypeEnvironmentForOperator(newAssign);
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
index 8c679c5..99a6b8c 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
@@ -143,11 +143,11 @@
         if (!intersectsBranch[0] && !intersectsBranch[1]) {
             return false;
         }
+        if (needToPushOps) {
+            pushOps(pushedOnLeft, joinBranchLeftRef, context);
+            pushOps(pushedOnRight, joinBranchRightRef, context);
+        }
         if (intersectsAllBranches) {
-            if (needToPushOps) {
-                pushOps(pushedOnLeft, joinBranchLeftRef, context);
-                pushOps(pushedOnRight, joinBranchRightRef, context);
-            }
             addCondToJoin(select, join, context);
         } else { // push down
             Iterator<Mutable<ILogicalOperator>> branchIter = join.getInputs().iterator();
@@ -156,13 +156,6 @@
                 Mutable<ILogicalOperator> branch = branchIter.next();
                 boolean inter = intersectsBranch[j];
                 if (inter) {
-                    if (needToPushOps) {
-                        if (j == 0) {
-                            pushOps(pushedOnLeft, joinBranchLeftRef, context);
-                        } else {
-                            pushOps(pushedOnRight, joinBranchRightRef, context);
-                        }
-                    }
                     copySelectToBranch(select, branch, context);
                 }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
new file mode 100644
index 0000000..1106898
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes duplicate variables from a group-by operator's decor list.
+ */
+public class RemoveRedundantGroupByDecorVars implements IAlgebraicRewriteRule {
+
+    private final Set<LogicalVariable> vars = new HashSet<LogicalVariable>();
+    
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        vars.clear();
+        
+        boolean modified = false;
+        GroupByOperator groupOp = (GroupByOperator) op;
+        Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+        while (iter.hasNext()) {
+            Pair<LogicalVariable, Mutable<ILogicalExpression>> decor = iter.next();
+            if (decor.first != null || decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                continue;
+            }
+            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) decor.second.getValue();
+            LogicalVariable var = varRefExpr.getVariableReference();
+            if (vars.contains(var)) {
+                iter.remove();
+                modified = true;
+            } else {
+                vars.add(var);
+            }
+        }
+        if (modified) {
+            context.addToDontApplySet(this, op);
+        }
+        return modified;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
new file mode 100644
index 0000000..ec57be5
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Replaces redundant variable references with their bottom-most equivalent representative.
+ * Does a DFS sweep over the plan keeping track of variable equivalence classes.
+ * For example, this rule would perform the following rewrite.
+ * 
+ * Before Plan:
+ * select (function-call: func, Args:[%0->$$11])
+ *   project [$11]
+ *     assign [$$11] <- [$$10]
+ *       assign [$$10] <- [$$9]
+ *         assign [$$9] <- ...
+ *           ...
+ *           
+ * After Plan:
+ * select (function-call: func, Args:[%0->$$9])
+ *   project [$9]
+ *     assign [$$11] <- [$$9]
+ *       assign [$$10] <- [$$9]
+ *         assign [$$9] <- ...
+ *           ...
+ */
+public class RemoveRedundantVariablesRule implements IAlgebraicRewriteRule {
+
+    private final VariableSubstitutionVisitor substVisitor = new VariableSubstitutionVisitor();
+    private final Map<LogicalVariable, List<LogicalVariable>> equivalentVarsMap = new HashMap<LogicalVariable, List<LogicalVariable>>();
+
+    protected boolean hasRun = false;
+    
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (context.checkIfInDontApplySet(this, opRef.getValue())) {
+            return false;
+        }
+        equivalentVarsMap.clear();
+        boolean modified = removeRedundantVariables(opRef, context);
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+        }
+        return modified;
+    }
+
+    private void updateEquivalenceClassMap(LogicalVariable lhs, LogicalVariable rhs) {
+        List<LogicalVariable> equivalentVars = equivalentVarsMap.get(rhs);
+        if (equivalentVars == null) {
+            equivalentVars = new ArrayList<LogicalVariable>();
+            // The first element in the list is the bottom-most representative which will replace all equivalent vars.
+            equivalentVars.add(rhs);
+            equivalentVars.add(lhs);
+            equivalentVarsMap.put(rhs, equivalentVars);
+        }
+        equivalentVarsMap.put(lhs, equivalentVars);
+        equivalentVars.get(0);
+    }
+
+    private boolean removeRedundantVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+
+        // Recurse into children.
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            if (removeRedundantVariables(inputOpRef, context)) {
+                modified = true;
+            }
+        }
+
+        // Update equivalence class map.
+        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            AssignOperator assignOp = (AssignOperator) op;
+            int numVars = assignOp.getVariables().size();
+            for (int i = 0; i < numVars; i++) {
+                ILogicalExpression expr = assignOp.getExpressions().get(i).getValue();
+                if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    continue;
+                }
+                VariableReferenceExpression rhsVarRefExpr = (VariableReferenceExpression) expr;
+                // Update equivalence class map.
+                LogicalVariable lhs = assignOp.getVariables().get(i);
+                LogicalVariable rhs = rhsVarRefExpr.getVariableReference();
+                updateEquivalenceClassMap(lhs, rhs);
+            }
+        }
+
+        // Replace variable references with their first representative. 
+        if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+            // The project operator does not use expressions, so we need to replace it's variables manually.
+            if (replaceProjectVars((ProjectOperator) op)) {
+                modified = true;
+            }
+        } else if(op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+            // Replace redundant variables manually in the UnionAll operator.
+            if (replaceUnionAllVars((UnionAllOperator) op)) {
+                modified = true;
+            }
+        } else {
+            if (op.acceptExpressionTransform(substVisitor)) {
+                modified = true;
+            }
+        }
+
+        // Perform variable replacement in nested plans. 
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans opWithNestedPlan = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan nestedPlan : opWithNestedPlan.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
+                    if (removeRedundantVariables(rootRef, context)) {
+                        modified = true;
+                    }
+                }
+            }
+        }
+
+        // Deal with re-mapping of variables in group by.
+        if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            if (handleGroupByVarRemapping((GroupByOperator) op)) {
+                modified = true;
+            }
+        }
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            context.addToDontApplySet(this, op);
+        }
+        return modified;
+    }
+
+    private boolean handleGroupByVarRemapping(GroupByOperator groupOp) {
+        boolean modified = false;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gp : groupOp.getGroupByList()) {
+            if (gp.first == null || gp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                continue;
+            }
+            LogicalVariable groupByVar = ((VariableReferenceExpression) gp.second.getValue()).getVariableReference();
+            Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+            while (iter.hasNext()) {
+                Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
+                if (dp.first != null || dp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    continue;
+                }
+                LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue()).getVariableReference();
+                if (dv == groupByVar) {
+                    // The decor variable is redundant, since it is propagated as a grouping variable.
+                    List<LogicalVariable> equivalentVars = equivalentVarsMap.get(groupByVar);
+                    if (equivalentVars != null) {
+                        // Change representative of this equivalence class.
+                        equivalentVars.set(0, gp.first);
+                        equivalentVarsMap.put(gp.first, equivalentVars);
+                    } else {
+                        updateEquivalenceClassMap(gp.first, groupByVar);
+                    }
+                    iter.remove();
+                    modified = true;
+                    break;
+                }
+            }
+        }
+        return modified;
+    }
+
+    /**
+     * Replace the projects's variables with their corresponding representative
+     * from the equivalence class map (if any).
+     * We cannot use the VariableSubstitutionVisitor here because the project ops
+     * maintain their variables as a list and not as expressions.
+     */
+    private boolean replaceProjectVars(ProjectOperator op) throws AlgebricksException {
+        List<LogicalVariable> vars = op.getVariables();
+        int size = vars.size();
+        boolean modified = false;
+        for (int i = 0; i < size; i++) {
+            LogicalVariable var = vars.get(i);
+            List<LogicalVariable> equivalentVars = equivalentVarsMap.get(var);
+            if (equivalentVars == null) {
+                continue;
+            }
+            // Replace with equivalence class representative.
+            LogicalVariable representative = equivalentVars.get(0);
+            if (representative != var) {
+                vars.set(i, equivalentVars.get(0));
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    private boolean replaceUnionAllVars(UnionAllOperator op) throws AlgebricksException {
+        boolean modified = false;
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : op.getVariableMappings()) {
+            List<LogicalVariable> firstEquivalentVars = equivalentVarsMap.get(varMapping.first);
+            List<LogicalVariable> secondEquivalentVars = equivalentVarsMap.get(varMapping.second);
+            // Replace variables with their representative.
+            if (firstEquivalentVars != null) {
+                varMapping.first = firstEquivalentVars.get(0);
+                modified = true;
+            }
+            if (secondEquivalentVars != null) {
+                varMapping.second = secondEquivalentVars.get(0);
+                modified = true;
+            }
+        }
+        return modified;
+    }
+    
+    private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) {
+            ILogicalExpression e = exprRef.getValue();
+            switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+                case VARIABLE: {
+                    // Replace variable references with their equivalent representative in the equivalence class map.
+                    VariableReferenceExpression varRefExpr = (VariableReferenceExpression) e;
+                    LogicalVariable var = varRefExpr.getVariableReference();
+                    List<LogicalVariable> equivalentVars = equivalentVarsMap.get(var);
+                    if (equivalentVars == null) {
+                        return false;
+                    }
+                    LogicalVariable representative = equivalentVars.get(0);
+                    if (representative != var) {
+                        varRefExpr.setVariable(representative);
+                        return true;
+                    }
+                    return false;
+                }
+                case FUNCTION_CALL: {
+                    AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) e;
+                    boolean modified = false;
+                    for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+                        if (transform(arg)) {
+                            modified = true;
+                        }
+                    }
+                    return modified;
+                }
+                default: {
+                    return false;
+                }
+            }
+        }
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index c53ea0a..e0c2741 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -23,6 +23,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -33,10 +34,14 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+/**
+ * Removes unused variables from Assign, Unnest, Aggregate, and UnionAll operators.
+ */
 public class RemoveUnusedAssignAndAggregateRule implements IAlgebraicRewriteRule {
 
     @Override
@@ -55,7 +60,7 @@
         if (smthToRemove) {
             removeUnusedAssigns(opRef, toRemove, context);
         }
-        return smthToRemove;
+        return !toRemove.isEmpty();
     }
 
     private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> toRemove,
@@ -87,28 +92,59 @@
 
     private int removeFromAssigns(AbstractLogicalOperator op, Set<LogicalVariable> toRemove,
             IOptimizationContext context) throws AlgebricksException {
-        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-            AssignOperator assign = (AssignOperator) op;
-            if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
-                context.computeAndSetTypeEnvironmentForOperator(assign);
+        switch (op.getOperatorTag()) {
+            case ASSIGN: {
+                AssignOperator assign = (AssignOperator) op;
+                if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
+                    context.computeAndSetTypeEnvironmentForOperator(assign);
+                }
+                return assign.getVariables().size();
             }
-            return assign.getVariables().size();
-        } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-            AggregateOperator agg = (AggregateOperator) op;
-            if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
-                context.computeAndSetTypeEnvironmentForOperator(agg);
+            case AGGREGATE: {
+                AggregateOperator agg = (AggregateOperator) op;
+                if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
+                    context.computeAndSetTypeEnvironmentForOperator(agg);
+                }
+                return agg.getVariables().size();
             }
-            return agg.getVariables().size();
-        } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
-            UnnestOperator uOp = (UnnestOperator) op;
-            LogicalVariable pVar = uOp.getPositionalVariable();
-            if (pVar != null && toRemove.contains(pVar)) {
-                uOp.setPositionalVariable(null);
+            case UNNEST: {
+                UnnestOperator uOp = (UnnestOperator) op;
+                LogicalVariable pVar = uOp.getPositionalVariable();
+                if (pVar != null && toRemove.contains(pVar)) {
+                    uOp.setPositionalVariable(null);
+                }
+                break;
+            }
+            case UNIONALL: {
+                UnionAllOperator unionOp = (UnionAllOperator) op;
+                if (removeUnusedVarsFromUnionAll(unionOp, toRemove)) {
+                    context.computeAndSetTypeEnvironmentForOperator(unionOp);
+                }
+                return unionOp.getVariableMappings().size();
             }
         }
         return -1;
     }
 
+    private boolean removeUnusedVarsFromUnionAll(UnionAllOperator unionOp, Set<LogicalVariable> toRemove) {
+        Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> iter = unionOp.getVariableMappings()
+                .iterator();
+        boolean modified = false;
+        Set<LogicalVariable> removeFromRemoveSet = new HashSet<LogicalVariable>();
+        while (iter.hasNext()) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
+            if (toRemove.contains(varMapping.third)) {
+                iter.remove();
+                modified = true;
+            }
+            // In any case, make sure we do not removing these variables.
+            removeFromRemoveSet.add(varMapping.first);
+            removeFromRemoveSet.add(varMapping.second);
+        }
+        toRemove.removeAll(removeFromRemoveSet);
+        return modified;
+    }
+
     private boolean removeUnusedVarsAndExprs(Set<LogicalVariable> toRemove, List<LogicalVariable> varList,
             List<Mutable<ILogicalExpression>> exprList) {
         boolean changed = false;
@@ -142,22 +178,41 @@
                 }
             }
         }
-        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-            AssignOperator assign = (AssignOperator) op;
-            toRemove.addAll(assign.getVariables());
-        } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-            AggregateOperator agg = (AggregateOperator) op;
-            toRemove.addAll(agg.getVariables());
-        } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
-            UnnestOperator uOp = (UnnestOperator) op;
-            LogicalVariable pVar = uOp.getPositionalVariable();
-            if (pVar != null) {
-                toRemove.add(pVar);
+        boolean removeUsedVars = true;
+        switch (op.getOperatorTag()) {
+            case ASSIGN: {
+                AssignOperator assign = (AssignOperator) op;
+                toRemove.addAll(assign.getVariables());
+                break;
+            }
+            case AGGREGATE: {
+                AggregateOperator agg = (AggregateOperator) op;
+                toRemove.addAll(agg.getVariables());
+                break;
+            }
+            case UNNEST: {
+                UnnestOperator uOp = (UnnestOperator) op;
+                LogicalVariable pVar = uOp.getPositionalVariable();
+                if (pVar != null) {
+                    toRemove.add(pVar);
+                }
+                break;
+            }
+            case UNIONALL: {
+                UnionAllOperator unionOp = (UnionAllOperator) op;
+                for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : unionOp
+                        .getVariableMappings()) {
+                    toRemove.add(varMapping.third);
+                }
+                removeUsedVars = false;
+                break;
             }
         }
-        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(op, used);
-        toRemove.removeAll(used);
+        if (removeUsedVars) {
+            List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+            VariableUtilities.getUsedVariables(op, used);
+            toRemove.removeAll(used);
+        }
     }
 
 }
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ArrayBackedValueStorage.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ArrayBackedValueStorage.java
index e8dc9b4..7d7d2c1 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ArrayBackedValueStorage.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ArrayBackedValueStorage.java
@@ -1,34 +1,28 @@
 package edu.uci.ics.hyracks.data.std.util;
 
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.api.IMutableValueStorage;
 import edu.uci.ics.hyracks.data.std.api.IValueReference;
 
 public class ArrayBackedValueStorage implements IMutableValueStorage {
-    private final ByteArrayAccessibleOutputStream baaos;
-    private final DataOutputStream dos;
-
-    public ArrayBackedValueStorage() {
-        baaos = new ByteArrayAccessibleOutputStream();
-        dos = new DataOutputStream(baaos);
-    }
+   
+    private final GrowableArray data = new GrowableArray();
 
     @Override
     public void reset() {
-        baaos.reset();
+        data.reset();
     }
 
     @Override
     public DataOutput getDataOutput() {
-        return dos;
+        return data.getDataOutput();
     }
 
     @Override
     public byte[] getByteArray() {
-        return baaos.getByteArray();
+        return data.getByteArray();
     }
 
     @Override
@@ -38,12 +32,12 @@
 
     @Override
     public int getLength() {
-        return baaos.size();
+        return data.getLength();
     }
 
     public void append(IValueReference value) {
         try {
-            dos.write(value.getByteArray(), value.getStartOffset(), value.getLength());
+            data.append(value);
         } catch (IOException e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/GrowableArray.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/GrowableArray.java
new file mode 100644
index 0000000..c174d4e
--- /dev/null
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/GrowableArray.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.data.std.util;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+
+public class GrowableArray implements IDataOutputProvider {
+    private final ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+    private final DataOutputStream dos = new DataOutputStream(baaos);
+
+    @Override
+    public DataOutput getDataOutput() {
+        return dos;
+    }
+
+    public void reset() {
+        baaos.reset();
+    }
+
+    public byte[] getByteArray() {
+        return baaos.getByteArray();
+    }
+
+    public int getLength() {
+        return baaos.size();
+    }
+
+    public void append(IValueReference value) throws IOException {
+        dos.write(value.getByteArray(), value.getStartOffset(), value.getLength());
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index 989bc6b..8c865c4 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -15,14 +15,13 @@
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 
 /**
  * Array backed tuple builder.
@@ -30,25 +29,21 @@
  * @author vinayakb
  */
 public class ArrayTupleBuilder implements IDataOutputProvider {
-    private final ByteArrayAccessibleOutputStream baaos;
-    private final DataOutputStream dos;
+    private final GrowableArray fieldData = new GrowableArray();
     private final int[] fEndOffsets;
     private int nextField;
 
     public ArrayTupleBuilder(int nFields) {
-        baaos = new ByteArrayAccessibleOutputStream();
-        dos = new DataOutputStream(baaos);
         fEndOffsets = new int[nFields];
     }
 
     /**
      * Resets the builder.
-     * 
      * reset() must be called before attempting to create a new tuple.
      */
     public void reset() {
         nextField = 0;
-        baaos.reset();
+        fieldData.reset();
     }
 
     /**
@@ -66,7 +61,7 @@
      * @return Data byte array.
      */
     public byte[] getByteArray() {
-        return baaos.getByteArray();
+        return fieldData.getByteArray();
     }
 
     /**
@@ -75,7 +70,7 @@
      * @return data area size.
      */
     public int getSize() {
-        return baaos.size();
+        return fieldData.getLength();
     }
 
     /**
@@ -96,14 +91,15 @@
         int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
         int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
         try {
-            dos.write(accessor.getBuffer().array(), startOffset + accessor.getFieldSlotsLength() + fStartOffset, fLen);
+            fieldData.getDataOutput().write(accessor.getBuffer().array(),
+                    startOffset + accessor.getFieldSlotsLength() + fStartOffset, fLen);
             if (FrameConstants.DEBUG_FRAME_IO) {
-                dos.writeInt(FrameConstants.FRAME_FIELD_MAGIC);
+                fieldData.getDataOutput().writeInt(FrameConstants.FRAME_FIELD_MAGIC);
             }
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-        fEndOffsets[nextField++] = baaos.size();
+        fEndOffsets[nextField++] = fieldData.getLength();
     }
 
     /**
@@ -117,8 +113,8 @@
      * @throws HyracksDataException
      */
     public <T> void addField(ISerializerDeserializer<T> serDeser, T instance) throws HyracksDataException {
-        serDeser.serialize(instance, dos);
-        fEndOffsets[nextField++] = baaos.size();
+        serDeser.serialize(instance, fieldData.getDataOutput());
+        fEndOffsets[nextField++] = fieldData.getLength();
     }
 
     /**
@@ -134,11 +130,11 @@
      */
     public void addField(byte[] bytes, int start, int length) throws HyracksDataException {
         try {
-            dos.write(bytes, start, length);
+            fieldData.getDataOutput().write(bytes, start, length);
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-        fEndOffsets[nextField++] = baaos.size();
+        fEndOffsets[nextField++] = fieldData.getLength();
     }
 
     /**
@@ -146,7 +142,14 @@
      */
     @Override
     public DataOutput getDataOutput() {
-        return dos;
+        return fieldData.getDataOutput();
+    }
+
+    /**
+     * Get the growable array storing the field data.
+     */
+    public GrowableArray getFieldData() {
+        return fieldData;
     }
 
     /**
@@ -156,6 +159,6 @@
      * data.
      */
     public void addFieldEndOffset() {
-        fEndOffsets[nextField++] = baaos.size();
+        fEndOffsets[nextField++] = fieldData.getLength();
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
index a29209c..8a30a71 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
@@ -18,17 +18,19 @@
 import java.io.IOException;
 
 public class StringUtils {
-    public static void writeCharAsModifiedUTF8(char c, DataOutput dos) throws IOException {
-
+    public static int writeCharAsModifiedUTF8(char c, DataOutput dos) throws IOException {
         if (c >= 0x0000 && c <= 0x007F) {
             dos.writeByte(c);
+            return 1;
         } else if (c <= 0x07FF) {
             dos.writeByte((byte) (0xC0 | ((c >> 6) & 0x3F)));
             dos.writeByte((byte) (0x80 | (c & 0x3F)));
+            return 2;
         } else {
             dos.writeByte((byte) (0xE0 | ((c >> 12) & 0x0F)));
             dos.writeByte((byte) (0x80 | ((c >> 6) & 0x3F)));
             dos.writeByte((byte) (0x80 | (c & 0x3F)));
+            return 3;
         }
     }
 
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index d00bea6..6744f70 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -15,13 +15,13 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.dataflow;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -41,7 +41,7 @@
 
     private FrameTupleAccessor accessor;
     private ArrayTupleBuilder builder;
-    private DataOutput builderDos;
+    private GrowableArray builderFieldData;
     private FrameTupleAppender appender;
     private ByteBuffer writeBuffer;
 
@@ -60,7 +60,7 @@
         accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
         writeBuffer = ctx.allocateFrame();
         builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
-        builderDos = builder.getDataOutput();
+        builderFieldData = builder.getFieldData();
         appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(writeBuffer, true);
         writer.open();
@@ -87,7 +87,7 @@
                     builder.reset();
                     try {
                         IToken token = tokenizer.getToken();
-                        token.serializeToken(builderDos);
+                        token.serializeToken(builderFieldData);
                         builder.addFieldEndOffset();
                     } catch (IOException e) {
                         throw new HyracksDataException(e.getMessage());
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
index 99e76dc..3525fc3 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
@@ -202,7 +202,9 @@
     public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
         // Create entry in btree for last inverted list.
         InvertedIndexBulkLoadContext ctx = (InvertedIndexBulkLoadContext) ictx;
-        createAndInsertBTreeTuple(ctx);
+        if (ctx.lastTuple.getFieldData(0) != null) {
+            createAndInsertBTreeTuple(ctx);
+        }
         btree.endBulkLoad(ctx.btreeBulkLoadCtx);
         ctx.deinit();
     }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
index 1f886a2..af5dad3 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.impls;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,6 +27,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -75,7 +75,7 @@
     protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
     protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
-    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
+    protected GrowableArray queryTokenFieldData = queryTokenBuilder.getFieldData();
     protected FrameTupleAppender queryTokenAppender;
     protected ByteBuffer queryTokenFrame;
 
@@ -158,7 +158,7 @@
             queryTokenBuilder.reset();
             try {
                 IToken token = queryTokenizer.getToken();
-                token.serializeToken(queryTokenDos);
+                token.serializeToken(queryTokenFieldData);
                 queryTokenBuilder.addFieldEndOffset();
                 // WARNING: assuming one frame is big enough to hold all tokens
                 queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
@@ -294,7 +294,8 @@
         boolean advanceCursor = true;
         boolean advancePrevResult = false;
         int resultTidx = 0;
-
+        currentNumResults = 0;
+        
         resultFrameTupleAcc.reset(prevCurrentBuffer);
         resultFrameTupleApp.reset(newCurrentBuffer, true);
 
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/AbstractUTF8Token.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/AbstractUTF8Token.java
index 65afa65..2f60952 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/AbstractUTF8Token.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/AbstractUTF8Token.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 
 public abstract class AbstractUTF8Token implements IToken {
     public static final int GOLDEN_RATIO_32 = 0x09e3779b9;
@@ -97,8 +98,8 @@
     }
 
     @Override
-    public void serializeTokenCount(DataOutput dos) throws IOException {
-        handleCountTypeTag(dos);
-        dos.writeInt(tokenCount);
+    public void serializeTokenCount(GrowableArray out) throws IOException {
+        handleCountTypeTag(out.getDataOutput());
+        out.getDataOutput().writeInt(tokenCount);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8NGramToken.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8NGramToken.java
index b7bb828..a1a4354 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8NGramToken.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8NGramToken.java
@@ -19,10 +19,10 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 
 public class HashedUTF8NGramToken extends UTF8NGramToken {
     public HashedUTF8NGramToken(byte tokenTypeTag, byte countTypeTag) {
@@ -30,8 +30,8 @@
     }
 
     @Override
-    public void serializeToken(DataOutput dos) throws IOException {
-        handleTokenTypeTag(dos);
+    public void serializeToken(GrowableArray out) throws IOException {
+        handleTokenTypeTag(out.getDataOutput());
 
         int hash = GOLDEN_RATIO_32;
 
@@ -59,6 +59,6 @@
         // token count
         hash += tokenCount;
 
-        dos.writeInt(hash);
+        out.getDataOutput().writeInt(hash);
     }
 }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8WordToken.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8WordToken.java
index 42ed053..20405c6 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8WordToken.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedUTF8WordToken.java
@@ -19,10 +19,10 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 
 public class HashedUTF8WordToken extends UTF8WordToken {
 
@@ -76,12 +76,12 @@
     }
 
     @Override
-    public void serializeToken(DataOutput dos) throws IOException {
+    public void serializeToken(GrowableArray out) throws IOException {
         if (tokenTypeTag > 0) {
-            dos.write(tokenTypeTag);
+            out.getDataOutput().write(tokenTypeTag);
         }
 
         // serialize hash value
-        dos.writeInt(hash);
+        out.getDataOutput().writeInt(hash);
     }
 }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/IToken.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/IToken.java
index c1840d7..47467a1 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/IToken.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/IToken.java
@@ -19,9 +19,10 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
+
 public interface IToken {
 	public byte[] getData();
 
@@ -34,7 +35,7 @@
 	public void reset(byte[] data, int start, int length, int tokenLength,
 			int tokenCount);
 
-	public void serializeToken(DataOutput dos) throws IOException;
+	public void serializeToken(GrowableArray out) throws IOException;
 
-	public void serializeTokenCount(DataOutput dos) throws IOException;
+	public void serializeTokenCount(GrowableArray out) throws IOException;
 }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8NGramToken.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8NGramToken.java
index 59cadc8..8cb9818 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8NGramToken.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8NGramToken.java
@@ -19,10 +19,10 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
 
 public class UTF8NGramToken extends AbstractUTF8Token implements INGramToken {
@@ -49,34 +49,39 @@
     }
 
     @Override
-    public void serializeToken(DataOutput dos) throws IOException {
-        handleTokenTypeTag(dos);
+    public void serializeToken(GrowableArray out) throws IOException {
+        handleTokenTypeTag(out.getDataOutput());
+        int tokenUTF8LenOff = out.getLength();
 
         // regular chars
         int numRegChars = tokenLength - numPreChars - numPostChars;
 
         // assuming pre and post char need 1-byte each in utf8
-        int tokenUTF8Len = getLowerCaseUTF8Len(numRegChars) + numPreChars + numPostChars;
+        int tokenUTF8Len = numPreChars + numPostChars;
 
-        // write utf8 length indicator
-        StringUtils.writeUTF8Len(tokenUTF8Len, dos);
+        // Write dummy UTF length which will be correctly set later.
+        out.getDataOutput().writeShort(0);
 
         // pre chars
         for (int i = 0; i < numPreChars; i++) {
-            StringUtils.writeCharAsModifiedUTF8(PRECHAR, dos);
+            StringUtils.writeCharAsModifiedUTF8(PRECHAR, out.getDataOutput());
         }
 
         int pos = start;
         for (int i = 0; i < numRegChars; i++) {
             char c = Character.toLowerCase(UTF8StringPointable.charAt(data, pos));
-            StringUtils.writeCharAsModifiedUTF8(c, dos);
+            tokenUTF8Len += StringUtils.writeCharAsModifiedUTF8(c, out.getDataOutput());
             pos += UTF8StringPointable.charSize(data, pos);
         }
 
         // post chars
         for (int i = 0; i < numPostChars; i++) {
-            StringUtils.writeCharAsModifiedUTF8(POSTCHAR, dos);
+            StringUtils.writeCharAsModifiedUTF8(POSTCHAR, out.getDataOutput());
         }
+
+        // Set UTF length of token.
+        out.getByteArray()[tokenUTF8LenOff] = (byte) ((tokenUTF8Len >>> 8) & 0xFF);
+        out.getByteArray()[tokenUTF8LenOff + 1] = (byte) ((tokenUTF8Len >>> 0) & 0xFF);
     }
 
     public void setNumPrePostChars(int numPreChars, int numPostChars) {
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8WordToken.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8WordToken.java
index 97a1e12..9d7fe7c 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8WordToken.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/UTF8WordToken.java
@@ -19,10 +19,10 @@
 
 package edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
 
 public class UTF8WordToken extends AbstractUTF8Token {
@@ -32,16 +32,20 @@
     }
 
     @Override
-    public void serializeToken(DataOutput dos) throws IOException {
-        handleTokenTypeTag(dos);
-
-        int tokenUTF8Len = getLowerCaseUTF8Len(tokenLength);
-        StringUtils.writeUTF8Len(tokenUTF8Len, dos);
+    public void serializeToken(GrowableArray out) throws IOException {
+        handleTokenTypeTag(out.getDataOutput());
+        int tokenUTF8LenOff = out.getLength();
+        int tokenUTF8Len = 0;
+        // Write dummy UTF length which will be correctly set later.
+        out.getDataOutput().writeShort(0);
         int pos = start;
         for (int i = 0; i < tokenLength; i++) {
             char c = Character.toLowerCase(UTF8StringPointable.charAt(data, pos));
-            StringUtils.writeCharAsModifiedUTF8(c, dos);
+            tokenUTF8Len += StringUtils.writeCharAsModifiedUTF8(c, out.getDataOutput());
             pos += UTF8StringPointable.charSize(data, pos);
         }
+        // Set UTF length of token.
+        out.getByteArray()[tokenUTF8LenOff] = (byte) ((tokenUTF8Len >>> 8) & 0xFF);
+        out.getByteArray()[tokenUTF8LenOff + 1] = (byte) ((tokenUTF8Len >>> 0) & 0xFF);
     }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index 9818dce..d9b7b97 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -40,9 +40,10 @@
             IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
-            IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, IOperationCallbackProvider opCallbackProvider) {
+            IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput,
+            IOperationCallbackProvider opCallbackProvider) {
         super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory, null, false, opCallbackProvider);
+                comparatorFactories, dataflowHelperFactory, null, retainInput, opCallbackProvider);
         this.keyFields = keyFields;
     }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/NGramTokenizerTest.java b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/NGramTokenizerTest.java
index 5f15a91..3fb6407 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/NGramTokenizerTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/NGramTokenizerTest.java
@@ -33,6 +33,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.AbstractUTF8Token;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.HashedUTF8NGramTokenFactory;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IToken;
@@ -41,207 +42,196 @@
 
 public class NGramTokenizerTest {
 
-	private char PRECHAR = '#';
-	private char POSTCHAR = '$';
+    private char PRECHAR = '#';
+    private char POSTCHAR = '$';
 
-	private String str = "Jürgen S. Generic's Car";
-	private byte[] inputBuffer;
+    private String str = "Jürgen S. Generic's Car";
+    private byte[] inputBuffer;
 
-	private int gramLength = 3;
+    private int gramLength = 3;
 
-	private void getExpectedGrams(String s, int gramLength,
-			ArrayList<String> grams, boolean prePost) {
+    private void getExpectedGrams(String s, int gramLength, ArrayList<String> grams, boolean prePost) {
 
-		String tmp = s.toLowerCase();
-		if (prePost) {
-			StringBuilder preBuilder = new StringBuilder();
-			for (int i = 0; i < gramLength - 1; i++) {
-				preBuilder.append(PRECHAR);
-			}
-			String pre = preBuilder.toString();
+        String tmp = s.toLowerCase();
+        if (prePost) {
+            StringBuilder preBuilder = new StringBuilder();
+            for (int i = 0; i < gramLength - 1; i++) {
+                preBuilder.append(PRECHAR);
+            }
+            String pre = preBuilder.toString();
 
-			StringBuilder postBuilder = new StringBuilder();
-			for (int i = 0; i < gramLength - 1; i++) {
-				postBuilder.append(POSTCHAR);
-			}
-			String post = postBuilder.toString();
+            StringBuilder postBuilder = new StringBuilder();
+            for (int i = 0; i < gramLength - 1; i++) {
+                postBuilder.append(POSTCHAR);
+            }
+            String post = postBuilder.toString();
 
-			tmp = pre + s.toLowerCase() + post;
-		}
+            tmp = pre + s.toLowerCase() + post;
+        }
 
-		for (int i = 0; i < tmp.length() - gramLength + 1; i++) {
-			String gram = tmp.substring(i, i + gramLength);
-			grams.add(gram);
-		}
-	}
+        for (int i = 0; i < tmp.length() - gramLength + 1; i++) {
+            String gram = tmp.substring(i, i + gramLength);
+            grams.add(gram);
+        }
+    }
 
-	@Before
-	public void init() throws Exception {
-		// serialize string into bytes
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutput dos = new DataOutputStream(baos);
-		dos.writeUTF(str);
-		inputBuffer = baos.toByteArray();
-	}
+    @Before
+    public void init() throws Exception {
+        // serialize string into bytes
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutput dos = new DataOutputStream(baos);
+        dos.writeUTF(str);
+        inputBuffer = baos.toByteArray();
+    }
 
-	void runTestNGramTokenizerWithCountedHashedUTF8Tokens(boolean prePost)
-			throws IOException {
-		HashedUTF8NGramTokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
-		NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(
-				gramLength, prePost, false, false, tokenFactory);
-		tokenizer.reset(inputBuffer, 0, inputBuffer.length);
+    void runTestNGramTokenizerWithCountedHashedUTF8Tokens(boolean prePost) throws IOException {
+        HashedUTF8NGramTokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
+        NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(gramLength, prePost, false,
+                false, tokenFactory);
+        tokenizer.reset(inputBuffer, 0, inputBuffer.length);
 
-		ArrayList<String> expectedGrams = new ArrayList<String>();
-		getExpectedGrams(str, gramLength, expectedGrams, prePost);
-		ArrayList<Integer> expectedHashedGrams = new ArrayList<Integer>();
-		HashMap<String, Integer> gramCounts = new HashMap<String, Integer>();
-		for (String s : expectedGrams) {
-			Integer count = gramCounts.get(s);
-			if (count == null) {
-				count = 1;
-				gramCounts.put(s, count);
-			} else {
-				count++;
-			}
+        ArrayList<String> expectedGrams = new ArrayList<String>();
+        getExpectedGrams(str, gramLength, expectedGrams, prePost);
+        ArrayList<Integer> expectedHashedGrams = new ArrayList<Integer>();
+        HashMap<String, Integer> gramCounts = new HashMap<String, Integer>();
+        for (String s : expectedGrams) {
+            Integer count = gramCounts.get(s);
+            if (count == null) {
+                count = 1;
+                gramCounts.put(s, count);
+            } else {
+                count++;
+            }
 
-			int hash = tokenHash(s, count);
-			expectedHashedGrams.add(hash);
-		}
+            int hash = tokenHash(s, count);
+            expectedHashedGrams.add(hash);
+        }
 
-		int tokenCount = 0;
+        int tokenCount = 0;
 
-		while (tokenizer.hasNext()) {
-			tokenizer.next();
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
 
-			// serialize hashed token
-			ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-			DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            // serialize hashed token
+            GrowableArray tokenStorage = new GrowableArray();
 
-			IToken token = tokenizer.getToken();
-			token.serializeToken(tokenDos);
+            IToken token = tokenizer.getToken();
+            token.serializeToken(tokenStorage);
 
-			// deserialize token
-			ByteArrayInputStream bais = new ByteArrayInputStream(
-					tokenBaos.toByteArray());
-			DataInput in = new DataInputStream(bais);
+            // deserialize token
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
+            DataInput in = new DataInputStream(bais);
 
-			Integer hashedGram = in.readInt();
+            Integer hashedGram = in.readInt();
 
-			// System.out.println(hashedGram);
+            // System.out.println(hashedGram);
 
-			Assert.assertEquals(expectedHashedGrams.get(tokenCount), hashedGram);
+            Assert.assertEquals(expectedHashedGrams.get(tokenCount), hashedGram);
 
-			tokenCount++;
-		}
-		// System.out.println("---------");
-	}
+            tokenCount++;
+        }
+        // System.out.println("---------");
+    }
 
-	void runTestNGramTokenizerWithHashedUTF8Tokens(boolean prePost)
-			throws IOException {
-		HashedUTF8NGramTokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
-		NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(
-				gramLength, prePost, true, false, tokenFactory);
-		tokenizer.reset(inputBuffer, 0, inputBuffer.length);
+    void runTestNGramTokenizerWithHashedUTF8Tokens(boolean prePost) throws IOException {
+        HashedUTF8NGramTokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
+        NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(gramLength, prePost, true, false,
+                tokenFactory);
+        tokenizer.reset(inputBuffer, 0, inputBuffer.length);
 
-		ArrayList<String> expectedGrams = new ArrayList<String>();
-		getExpectedGrams(str, gramLength, expectedGrams, prePost);
-		ArrayList<Integer> expectedHashedGrams = new ArrayList<Integer>();
-		for (String s : expectedGrams) {
-			int hash = tokenHash(s, 1);
-			expectedHashedGrams.add(hash);
-		}
+        ArrayList<String> expectedGrams = new ArrayList<String>();
+        getExpectedGrams(str, gramLength, expectedGrams, prePost);
+        ArrayList<Integer> expectedHashedGrams = new ArrayList<Integer>();
+        for (String s : expectedGrams) {
+            int hash = tokenHash(s, 1);
+            expectedHashedGrams.add(hash);
+        }
 
-		int tokenCount = 0;
+        int tokenCount = 0;
 
-		while (tokenizer.hasNext()) {
-			tokenizer.next();
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
 
-			// serialize hashed token
-			ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-			DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            // serialize hashed token
+            GrowableArray tokenStorage = new GrowableArray();
 
-			IToken token = tokenizer.getToken();
-			token.serializeToken(tokenDos);
+            IToken token = tokenizer.getToken();
+            token.serializeToken(tokenStorage);
 
-			// deserialize token
-			ByteArrayInputStream bais = new ByteArrayInputStream(
-					tokenBaos.toByteArray());
-			DataInput in = new DataInputStream(bais);
+            // deserialize token
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
+            DataInput in = new DataInputStream(bais);
 
-			Integer hashedGram = in.readInt();
+            Integer hashedGram = in.readInt();
 
-			// System.out.println(hashedGram);
+            // System.out.println(hashedGram);
 
-			Assert.assertEquals(expectedHashedGrams.get(tokenCount), hashedGram);
+            Assert.assertEquals(expectedHashedGrams.get(tokenCount), hashedGram);
 
-			tokenCount++;
-		}
-		// System.out.println("---------");
-	}
+            tokenCount++;
+        }
+        // System.out.println("---------");
+    }
 
-	void runTestNGramTokenizerWithUTF8Tokens(boolean prePost)
-			throws IOException {
-		UTF8NGramTokenFactory tokenFactory = new UTF8NGramTokenFactory();
-		NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(
-				gramLength, prePost, true, false, tokenFactory);
-		tokenizer.reset(inputBuffer, 0, inputBuffer.length);
+    void runTestNGramTokenizerWithUTF8Tokens(boolean prePost) throws IOException {
+        UTF8NGramTokenFactory tokenFactory = new UTF8NGramTokenFactory();
+        NGramUTF8StringBinaryTokenizer tokenizer = new NGramUTF8StringBinaryTokenizer(gramLength, prePost, true, false,
+                tokenFactory);
+        tokenizer.reset(inputBuffer, 0, inputBuffer.length);
 
-		ArrayList<String> expectedGrams = new ArrayList<String>();
-		getExpectedGrams(str, gramLength, expectedGrams, prePost);
+        ArrayList<String> expectedGrams = new ArrayList<String>();
+        getExpectedGrams(str, gramLength, expectedGrams, prePost);
 
-		int tokenCount = 0;
+        int tokenCount = 0;
 
-		while (tokenizer.hasNext()) {
-			tokenizer.next();
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
 
-			// serialize hashed token
-			ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-			DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            // serialize hashed token
+            GrowableArray tokenStorage = new GrowableArray();
 
-			IToken token = tokenizer.getToken();
-			token.serializeToken(tokenDos);
+            IToken token = tokenizer.getToken();
+            token.serializeToken(tokenStorage);
 
-			// deserialize token
-			ByteArrayInputStream bais = new ByteArrayInputStream(
-					tokenBaos.toByteArray());
-			DataInput in = new DataInputStream(bais);
+            // deserialize token
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
+            DataInput in = new DataInputStream(bais);
 
-			String strGram = in.readUTF();
+            String strGram = in.readUTF();
 
-			// System.out.println("\"" + strGram + "\"");
+            // System.out.println("\"" + strGram + "\"");
 
-			Assert.assertEquals(expectedGrams.get(tokenCount), strGram);
+            Assert.assertEquals(expectedGrams.get(tokenCount), strGram);
 
-			tokenCount++;
-		}
-		// System.out.println("---------");
-	}
+            tokenCount++;
+        }
+        // System.out.println("---------");
+    }
 
-	@Test
-	public void testNGramTokenizerWithCountedHashedUTF8Tokens()
-			throws Exception {
-		runTestNGramTokenizerWithCountedHashedUTF8Tokens(false);
-		runTestNGramTokenizerWithCountedHashedUTF8Tokens(true);
-	}
+    @Test
+    public void testNGramTokenizerWithCountedHashedUTF8Tokens() throws Exception {
+        runTestNGramTokenizerWithCountedHashedUTF8Tokens(false);
+        runTestNGramTokenizerWithCountedHashedUTF8Tokens(true);
+    }
 
-	@Test
-	public void testNGramTokenizerWithHashedUTF8Tokens() throws Exception {
-		runTestNGramTokenizerWithHashedUTF8Tokens(false);
-		runTestNGramTokenizerWithHashedUTF8Tokens(true);
-	}
+    @Test
+    public void testNGramTokenizerWithHashedUTF8Tokens() throws Exception {
+        runTestNGramTokenizerWithHashedUTF8Tokens(false);
+        runTestNGramTokenizerWithHashedUTF8Tokens(true);
+    }
 
-	@Test
-	public void testNGramTokenizerWithUTF8Tokens() throws IOException {
-		runTestNGramTokenizerWithUTF8Tokens(false);
-		runTestNGramTokenizerWithUTF8Tokens(true);
-	}
+    @Test
+    public void testNGramTokenizerWithUTF8Tokens() throws IOException {
+        runTestNGramTokenizerWithUTF8Tokens(false);
+        runTestNGramTokenizerWithUTF8Tokens(true);
+    }
 
-	public int tokenHash(String token, int tokenCount) {
-		int h = AbstractUTF8Token.GOLDEN_RATIO_32;
-		for (int i = 0; i < token.length(); i++) {
-			h ^= token.charAt(i);
-			h *= AbstractUTF8Token.GOLDEN_RATIO_32;
-		}
-		return h + tokenCount;
-	}
+    public int tokenHash(String token, int tokenCount) {
+        int h = AbstractUTF8Token.GOLDEN_RATIO_32;
+        for (int i = 0; i < token.length(); i++) {
+            h ^= token.charAt(i);
+            h *= AbstractUTF8Token.GOLDEN_RATIO_32;
+        }
+        return h + tokenCount;
+    }
 }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/SearchTest.java b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/SearchTest.java
index c3c9b99..47a068b 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/SearchTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/SearchTest.java
@@ -27,6 +27,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -112,20 +113,19 @@
 	}
 
 	private class TokenIdPair implements Comparable<TokenIdPair> {
-		public ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
-		public DataOutputStream dos = new DataOutputStream(baaos);
+	    public final GrowableArray tokenStorage = new GrowableArray();
 		public int id;
 
 		TokenIdPair(IToken token, int id) throws IOException {
-			token.serializeToken(dos);
+			token.serializeToken(tokenStorage);
 			this.id = id;
 		}
 
 		@Override
 		public int compareTo(TokenIdPair o) {
-			int cmp = btreeBinCmps[0].compare(baaos.getByteArray(), 0,
-					baaos.getByteArray().length, o.baaos.getByteArray(), 0,
-					o.baaos.getByteArray().length);
+			int cmp = btreeBinCmps[0].compare(tokenStorage.getByteArray(), 0,
+			        tokenStorage.getByteArray().length, o.tokenStorage.getByteArray(), 0,
+					o.tokenStorage.getByteArray().length);
 			if (cmp == 0) {
 				return id - o.id;
 			} else {
@@ -157,8 +157,8 @@
 
 		for (TokenIdPair t : pairs) {
 			tb.reset();
-			tb.addField(t.baaos.getByteArray(), 0,
-					t.baaos.getByteArray().length);
+			tb.addField(t.tokenStorage.getByteArray(), 0,
+					t.tokenStorage.getByteArray().length);
 			IntegerSerializerDeserializer.INSTANCE.serialize(t.id, tb.getDataOutput());
 			tb.addFieldEndOffset();
 			tuple.reset(tb.getFieldEndOffsets(), tb.getByteArray());
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/WordTokenizerTest.java b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/WordTokenizerTest.java
index 53fb96d..810c5f5 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/WordTokenizerTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/WordTokenizerTest.java
@@ -34,6 +34,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import edu.uci.ics.hyracks.data.std.util.GrowableArray;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.AbstractUTF8Token;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizer;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.HashedUTF8WordTokenFactory;
@@ -127,14 +128,13 @@
             tokenizer.next();
 
             // serialize token
-            ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-            DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            GrowableArray tokenStorage = new GrowableArray();
 
             IToken token = tokenizer.getToken();
-            token.serializeToken(tokenDos);
+            token.serializeToken(tokenStorage);
 
             // deserialize token
-            ByteArrayInputStream bais = new ByteArrayInputStream(tokenBaos.toByteArray());
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
             DataInput in = new DataInputStream(bais);
 
             Integer hashedToken = in.readInt();
@@ -159,14 +159,13 @@
             tokenizer.next();
 
             // serialize token
-            ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-            DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            GrowableArray tokenStorage = new GrowableArray();
 
             IToken token = tokenizer.getToken();
-            token.serializeToken(tokenDos);
+            token.serializeToken(tokenStorage);
 
             // deserialize token
-            ByteArrayInputStream bais = new ByteArrayInputStream(tokenBaos.toByteArray());
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
             DataInput in = new DataInputStream(bais);
 
             Integer hashedToken = in.readInt();
@@ -191,14 +190,13 @@
             tokenizer.next();
 
             // serialize hashed token
-            ByteArrayOutputStream tokenBaos = new ByteArrayOutputStream();
-            DataOutput tokenDos = new DataOutputStream(tokenBaos);
+            GrowableArray tokenStorage = new GrowableArray();
 
             IToken token = tokenizer.getToken();
-            token.serializeToken(tokenDos);
+            token.serializeToken(tokenStorage);
 
             // deserialize token
-            ByteArrayInputStream bais = new ByteArrayInputStream(tokenBaos.toByteArray());
+            ByteArrayInputStream bais = new ByteArrayInputStream(tokenStorage.getByteArray());
             DataInput in = new DataInputStream(bais);
 
             String strToken = in.readUTF();
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index 570421e..26bd647 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -11,21 +11,6 @@
     <jvm.extraargs />
   </properties>
 
-  <profiles>
-    <profile>
-      <id>macosx</id>
-      <activation>
-        <os>
-          <name>mac os x</name>
-        </os>
-        <jdk>1.7</jdk>
-      </activation>
-      <properties>
-        <jvm.extraargs>-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.KQueueSelectorProvider</jvm.extraargs>
-      </properties>
-    </profile>
-  </profiles>
-
   <build>
     <plugins>
       <plugin>