SQL++ support in Algebricks:
1. added the OuterUnnestOperator;
2. fixed several rewriting rules.

Change-Id: I7dcf57f75ebc0a741b6ec9597525e226b6014fc0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/314
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFixpointRuleController.java b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFixpointRuleController.java
index 440af4b..b6d6d61 100644
--- a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFixpointRuleController.java
+++ b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/rewriter/rulecontrollers/SequentialFixpointRuleController.java
@@ -59,5 +59,4 @@
         } while (anyChange);
         return anyRuleFired;
     }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 6668536..5388092 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -48,6 +48,7 @@
     TOKENIZE,
     UNIONALL,
     UNNEST,
+    OUTER_UNNEST,
     UNNEST_MAP,
     UPDATE,
     WRITE,
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
index 07124bf..6c47b82 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
@@ -46,4 +46,9 @@
         return bcast;
     }
 
+    @Override
+    public String toString() {
+        return BROADCAST_ANNOTATION_KEY;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
index eaeeac7..a38a96c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
@@ -41,4 +41,9 @@
         clone.setObject(object);
         return clone;
     }
+
+    @Override
+    public String toString() {
+        return INDEXED_NL_JOIN_ANNOTATION_KEY;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 79452c8..cfb64d4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -24,7 +24,6 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -76,6 +75,7 @@
     @Override
     public abstract LogicalOperatorTag getOperatorTag();
 
+    @Override
     public ExecutionMode getExecutionMode() {
         return mode;
     }
@@ -154,7 +154,7 @@
     @Override
     public final void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (bJobGenEnabled) {
             if (physicalOperator == null) {
                 throw new AlgebricksException("Physical operator not set for operator: " + this);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
new file mode 100644
index 0000000..c04957c
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
+
+public abstract class AbstractUnnestNonMapOperator extends AbstractUnnestOperator {
+
+    protected LogicalVariable positionalVariable;
+
+    /**
+     * Used to set the position offset for positional variable
+     */
+    protected ILogicalExpression positionOffsetExpr;
+
+    /**
+     * Specify the writer of the positional variable
+     */
+    protected IUnnestingPositionWriter positionWriter;
+
+    /**
+     * Specify the type of the positional variable
+     */
+    protected Object positionalVariableType;
+
+    public AbstractUnnestNonMapOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
+        super(makeSingletonList(variable), expression);
+    }
+
+    public AbstractUnnestNonMapOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
+            LogicalVariable positionalVariable, Object positionalVariableType,
+            IUnnestingPositionWriter positionWriter) {
+        this(variable, expression);
+        this.setPositionalVariable(positionalVariable);
+        this.setPositionalVariableType(positionalVariableType);
+        this.setPositionWriter(positionWriter);
+    }
+
+    public LogicalVariable getVariable() {
+        return variables.get(0);
+    }
+
+    public void setPositionalVariable(LogicalVariable positionalVariable) {
+        this.positionalVariable = positionalVariable;
+    }
+
+    public LogicalVariable getPositionalVariable() {
+        return positionalVariable;
+    }
+
+    public void setPositionWriter(IUnnestingPositionWriter positionWriter) {
+        this.positionWriter = positionWriter;
+    }
+
+    public IUnnestingPositionWriter getPositionWriter() {
+        return positionalVariable != null ? positionWriter : null;
+    }
+
+    public void setPositionalVariableType(Object positionalVariableType) {
+        this.positionalVariableType = positionalVariableType;
+    }
+
+    public Object getPositionalVariableType() {
+        return positionalVariableType;
+    }
+
+    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+        this.positionOffsetExpr = posOffsetExpr;
+    }
+
+    public ILogicalExpression getPositionOffsetExpr() {
+        return this.positionOffsetExpr;
+    }
+
+    protected static <E> ArrayList<E> makeSingletonList(E item) {
+        ArrayList<E> array = new ArrayList<E>(1);
+        array.add(item);
+        return array;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+                if (positionalVariable != null) {
+                    target.addVariable(positionalVariable);
+                }
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OuterUnnestOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OuterUnnestOperator.java
new file mode 100644
index 0000000..133656b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/OuterUnnestOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
+
+public class OuterUnnestOperator extends AbstractUnnestNonMapOperator {
+
+    public OuterUnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
+        super(variable, expression);
+    }
+
+    public OuterUnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
+            LogicalVariable positionalVariable, Object positionalVariableType,
+            IUnnestingPositionWriter positionWriter) {
+        super(variable, expression, positionalVariable, positionalVariableType, positionWriter);
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitOuterUnnestOperator(this, arg);
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+        Object t = env.getType(expression.getValue());
+        env.setVarType(variables.get(0), t);
+        if (positionalVariable != null) {
+            env.setVarType(positionalVariable, positionalVariableType);
+        }
+
+        // The produced variables of the this operator are nullable because of the left outer semantics.
+        List<LogicalVariable> nullableVars = new ArrayList<LogicalVariable>();
+        nullableVars.add(variables.get(0));
+        if (positionalVariable != null) {
+            nullableVars.add(positionalVariable);
+        }
+        env.getCorrelatedNullableVariableLists().add(nullableVars);
+        return env;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.OUTER_UNNEST;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
index bfd2df7..af85fdd 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
@@ -19,10 +19,8 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
@@ -30,7 +28,6 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
 import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -44,14 +41,15 @@
         for (int i = 0; i < inputs.size(); i++) {
             for (LogicalVariable v : inputs.get(i).getValue().getSchema()) {
                 if (!schema.contains(v))
-                	schema.add(v);
+                    schema.add(v);
             }
 
         }
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         return false;
     }
 
@@ -72,13 +70,13 @@
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-      ITypeEnvPointer[] envPointers = new ITypeEnvPointer[inputs.size()];
-      for (int i = 0; i < inputs.size(); i++) {
-          envPointers[i] = new OpRefTypeEnvPointer(inputs.get(i), ctx);
-      }
-      PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
-              ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
-      return env;
+        ITypeEnvPointer[] envPointers = new ITypeEnvPointer[inputs.size()];
+        for (int i = 0; i < inputs.size(); i++) {
+            envPointers[i] = new OpRefTypeEnvPointer(inputs.get(i), ctx);
+        }
+        PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+                ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+        return env;
 
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index aded8a9..1ee0942 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
-
 import org.apache.commons.lang3.mutable.Mutable;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -27,81 +25,19 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
 
-public class UnnestOperator extends AbstractUnnestOperator {
-
-    private LogicalVariable positionalVariable;
-
-    /**
-     * Used to set the position offset for positional variable
-     */
-    private ILogicalExpression positionOffsetExpr;
-
-    /**
-     * Specify the writer of the positional variable
-     */
-    private IUnnestingPositionWriter positionWriter;
-
-    /**
-     * Specify the type of the positional variable
-     */
-    private Object positionalVariableType;
+public class UnnestOperator extends AbstractUnnestNonMapOperator {
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
-        super(makeSingletonList(variable), expression);
+        super(variable, expression);
     }
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
             LogicalVariable positionalVariable, Object positionalVariableType, IUnnestingPositionWriter positionWriter) {
-        this(variable, expression);
-        this.setPositionalVariable(positionalVariable);
-        this.setPositionalVariableType(positionalVariableType);
-        this.setPositionWriter(positionWriter);
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.UNNEST;
-    }
-
-    public LogicalVariable getVariable() {
-        return variables.get(0);
-    }
-
-    public void setPositionalVariable(LogicalVariable positionalVariable) {
-        this.positionalVariable = positionalVariable;
-    }
-
-    public LogicalVariable getPositionalVariable() {
-        return positionalVariable;
-    }
-
-    public void setPositionWriter(IUnnestingPositionWriter positionWriter) {
-        this.positionWriter = positionWriter;
-    }
-
-    public IUnnestingPositionWriter getPositionWriter() {
-        return positionalVariable != null ? positionWriter : null;
-    }
-
-    public void setPositionalVariableType(Object positionalVariableType) {
-        this.positionalVariableType = positionalVariableType;
-    }
-
-    public Object getPositionalVariableType() {
-        return positionalVariableType;
-    }
-
-    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
-        this.positionOffsetExpr = posOffsetExpr;
-    }
-
-    public ILogicalExpression getPositionOffsetExpr() {
-        return this.positionOffsetExpr;
+        super(variable, expression, positionalVariable, positionalVariableType, positionWriter);
     }
 
     @Override
@@ -109,12 +45,6 @@
         return visitor.visitUnnestOperator(this, arg);
     }
 
-    private static <E> ArrayList<E> makeSingletonList(E item) {
-        ArrayList<E> array = new ArrayList<E>(1);
-        array.add(item);
-        return array;
-    }
-
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
@@ -127,22 +57,7 @@
     }
 
     @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return new VariablePropagationPolicy() {
-
-            @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
-                if (sources.length > 0) {
-                    target.addAllVariables(sources[0]);
-                }
-                for (LogicalVariable v : variables) {
-                    target.addVariable(v);
-                }
-                if (positionalVariable != null) {
-                    target.addVariable(positionalVariable);
-                }
-            }
-        };
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.UNNEST;
     }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 6d00fab..25993bb 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -29,7 +29,6 @@
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -65,6 +64,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -311,9 +311,9 @@
             }
         }
         if (changed) {
-            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Group-by list changed from "
-                    + GroupByOperator.veListToString(gByList) + " to " + GroupByOperator.veListToString(newGbyList)
-                    + ".\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER
+                    .fine(">>>> Group-by list changed from " + GroupByOperator.veListToString(gByList) + " to "
+                            + GroupByOperator.veListToString(newGbyList) + ".\n");
         }
         gByList.clear();
         gByList.addAll(newGbyList);
@@ -509,7 +509,8 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
@@ -574,7 +575,7 @@
     /***
      * Propagated equivalent classes from the child to the current operator, based
      * on the used variables of the current operator.
-     * 
+     *
      * @param op
      *            , the current operator
      * @param ctx
@@ -740,7 +741,7 @@
     /**
      * Propagate equivalences that carried in expressions to the variables that
      * they are assigned to.
-     * 
+     *
      * @param eqClasses
      *            an equivalent class map
      * @param assignExprs
@@ -753,15 +754,24 @@
         for (int assignVarIndex = 0; assignVarIndex < assignVars.size(); ++assignVarIndex) {
             LogicalVariable var = assignVars.get(assignVarIndex);
             ILogicalExpression expr = assignExprs.get(assignVarIndex).getValue();
+            Map<LogicalVariable, EquivalenceClass> newVarEqcMap = new HashMap<LogicalVariable, EquivalenceClass>();
             for (Entry<LogicalVariable, EquivalenceClass> entry : eqClasses.entrySet()) {
                 EquivalenceClass eqc = entry.getValue();
                 // If the equivalence class contains the right-hand-side expression,
                 // the left-hand-side variable is added into the equivalence class.
                 if (eqc.contains(expr)) {
                     eqc.addMember(var);
+                    newVarEqcMap.put(var, eqc); // Add var as a map key for the equivalence class.
                 }
             }
+            eqClasses.putAll(newVarEqcMap);
         }
     }
 
-}
\ No newline at end of file
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index a982ed4..dc535ea 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -576,4 +577,18 @@
             return false;
     }
 
+    @Override
+    public Boolean visitOuterUnnestOperator(OuterUnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.OUTER_UNNEST)
+            return Boolean.FALSE;
+        OuterUnnestOperator unnestOpArg = (OuterUnnestOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), unnestOpArg.getVariables())
+                && variableEqual(op.getPositionalVariable(), unnestOpArg.getPositionalVariable());
+        if (!isomorphic)
+            return Boolean.FALSE;
+        isomorphic = op.getExpressionRef().getValue().equals(unnestOpArg.getExpressionRef().getValue());
+        return isomorphic;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 4f0bcd3..376c2bc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -448,4 +449,10 @@
         return null;
     }
 
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 32dbc94..8e378bc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -46,6 +45,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -80,8 +80,8 @@
         }
         op.accept(visitor, context);
         if (AlgebricksConfig.DEBUG) {
-            AlgebricksConfig.ALGEBRICKS_LOGGER.finest("Logical properties visitor for " + op + ": "
-                    + context.getLogicalPropertiesVector(op) + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(
+                    "Logical properties visitor for " + op + ": " + context.getLogicalPropertiesVector(op) + "\n");
         }
     }
 
@@ -259,7 +259,8 @@
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext arg) throws AlgebricksException {
+    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -272,8 +273,7 @@
     }
 
     @Override
-    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext arg)
-            throws AlgebricksException {
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -351,4 +351,9 @@
         return null;
     }
 
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index afc832a..74e74a6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -24,7 +24,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -52,6 +51,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -115,20 +115,20 @@
 
     @Override
     public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
-        return new LimitOperator(deepCopyExpressionRef(op.getMaxObjects()).getValue(), deepCopyExpressionRef(
-                op.getOffset()).getValue(), op.isTopmostLimitOp());
+        return new LimitOperator(deepCopyExpressionRef(op.getMaxObjects()).getValue(),
+                deepCopyExpressionRef(op.getOffset()).getValue(), op.isTopmostLimitOp());
     }
 
     @Override
     public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
-        return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op.getInputs()
-                .get(1));
+        return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0),
+                op.getInputs().get(1));
     }
 
     @Override
     public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
-        return new LeftOuterJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
-                .getInputs().get(1));
+        return new LeftOuterJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0),
+                op.getInputs().get(1));
     }
 
     @Override
@@ -215,8 +215,8 @@
     public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
         ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
         newInputList.addAll(op.getVariables());
-        return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()), new ArrayList<Object>(
-                op.getVariableTypes()), op.propagatesInput());
+        return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
+                new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
     }
 
     @Override
@@ -272,7 +272,8 @@
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
         InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
-                deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(), op.isBulkload());
+                deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
+                op.isBulkload());
         insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
         return insertDeleteOp;
     }
@@ -322,12 +323,13 @@
     private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
             List<Mutable<ILogicalExpression>> oldExprs) {
         for (Mutable<ILogicalExpression> oldExpr : oldExprs)
-            newExprs.add(new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue())
-                    .cloneExpression()));
+            newExprs.add(new MutableObject<ILogicalExpression>(
+                    ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
     }
 
     private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
-        return new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
+        return new MutableObject<ILogicalExpression>(
+                ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
     }
 
     private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
@@ -346,8 +348,8 @@
             List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
         List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
         for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
-            newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
-                    deepCopyExpressionRef(pair.second)));
+            newOrdersAndExprs
+                    .add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first, deepCopyExpressionRef(pair.second)));
         return newOrdersAndExprs;
     }
 
@@ -369,4 +371,10 @@
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return new MaterializeOperator();
     }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return new OuterUnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
+                op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 8383ef2..da5d2b2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestNonMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -49,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -210,13 +212,7 @@
 
     @Override
     public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
-        producedVariables.addAll(op.getVariables());
-        LogicalVariable positionalVariable = op.getPositionalVariable();
-        if (positionalVariable != null) {
-            if (!producedVariables.contains(positionalVariable))
-                producedVariables.add(positionalVariable);
-        }
-        return null;
+        return visitUnnestNonMapOperator(op);
     }
 
     @Override
@@ -276,4 +272,19 @@
         producedVariables.add(op.getVariables().get(0));
         return null;
     }
+
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return visitUnnestNonMapOperator(op);
+    }
+
+    private Void visitUnnestNonMapOperator(AbstractUnnestNonMapOperator op) {
+        producedVariables.addAll(op.getVariables());
+        LogicalVariable positionalVariable = op.getPositionalVariable();
+        if (positionalVariable != null) {
+            if (!producedVariables.contains(positionalVariable))
+                producedVariables.add(positionalVariable);
+        }
+        return null;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 29b7edd..dc89c12 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -331,4 +332,10 @@
         return null;
     }
 
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 77e6cb6..2268a9e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestNonMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -51,6 +52,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -322,16 +324,7 @@
     @Override
     public Void visitUnnestOperator(UnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        for (int i = 0; i < variables.size(); i++) {
-            if (variables.get(i) == pair.first) {
-                variables.set(i, pair.second);
-                return null;
-            }
-        }
-        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
-        substVarTypes(op, pair);
-        return null;
+        return visitUnnestNonMapOperator(op, pair);
     }
 
     @Override
@@ -470,4 +463,24 @@
         substVarTypes(op, pair);
         return null;
     }
+
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        return visitUnnestNonMapOperator(op, pair);
+    }
+
+    private Void visitUnnestNonMapOperator(AbstractUnnestNonMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        for (int i = 0; i < variables.size(); i++) {
+            if (variables.get(i) == pair.first) {
+                variables.set(i, pair.second);
+                return null;
+            }
+        }
+        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index bfb9036..f80e1cd 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -408,4 +409,10 @@
         return null;
     }
 
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 1404a47..17f2285 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -48,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -86,7 +86,8 @@
     }
 
     @Override
-    public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
+    public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -185,7 +186,8 @@
     }
 
     @Override
-    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("distribute result ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -256,11 +258,21 @@
     }
 
     @Override
+    public String visitOuterUnnestOperator(OuterUnnestOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("outer-unnest " + op.getVariable());
+        if (op.getPositionalVariable() != null) {
+            buffer.append(" at " + op.getPositionalVariable());
+        }
+        buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        return buffer.toString();
+    }
+
+    @Override
     public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "unnest-map " + op.getVariables() + " <- "
-                        + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        addIndent(buffer, indent).append("unnest-map " + op.getVariables() + " <- "
+                + op.getExpressionRef().getValue().accept(exprVisitor, indent));
         return buffer.toString();
     }
 
@@ -293,8 +305,8 @@
     @Override
     public String visitScriptOperator(ScriptOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
+        addIndent(buffer, indent)
+                .append("script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
         return buffer.toString();
     }
 
@@ -427,8 +439,8 @@
     public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+        addIndent(buffer, indent)
+                .append("external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
         return buffer.toString();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
index 612dce2..fa9ad05 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
@@ -36,7 +36,8 @@
             for (ITypeEnvPointer p : typeEnvs) {
                 IVariableTypeEnvironment env = p.getTypeEnv();
                 if (env == null) {
-                    throw new AlgebricksException("Null environment for pointer " + p + " in getVarType for var=" + var);
+                    throw new AlgebricksException(
+                            "Null environment for pointer " + p + " in getVarType for var=" + var);
                 }
                 Object t = env.getVarType(var, nonNullVariableList, correlatedNullableVariableLists);
                 if (t != null) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 423a0a4..285812c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -96,6 +97,8 @@
 
     public R visitUnnestOperator(UnnestOperator op, T arg) throws AlgebricksException;
 
+    public R visitOuterUnnestOperator(OuterUnnestOperator op, T arg) throws AlgebricksException;
+
     public R visitUnnestMapOperator(UnnestMapOperator op, T arg) throws AlgebricksException;
 
     public R visitDataScanOperator(DataSourceScanOperator op, T arg) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index e3695fd..5adb566 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -22,7 +22,6 @@
 import java.util.logging.Level;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -46,7 +45,7 @@
 
     /**
      * Each rewriting strategy may differ in the
-     * 
+     *
      * @param root
      * @param ruleClasses
      * @return true iff one of the rules in the collection fired
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
index ad68348..83740de 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.algebricks.core.rewriter.base;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
deleted file mode 100644
index 959f887..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.rewriter.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
-        if (op1.getPhysicalOperator() == null
-                || (op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE && op1
-                        .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE)) {
-            return false;
-        }
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op2.getPhysicalOperator() == null
-                || op2.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
-            return false;
-        }
-        if (op1.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE) {
-            // if it is a hash_partition_merge_exchange, the sort_merge_exchange can be simply removed
-            op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
-            op1.computeDeliveredPhysicalProperties(context);
-            return true;
-        }
-        HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
-        SortMergeExchangePOperator sme = (SortMergeExchangePOperator) op2.getPhysicalOperator();
-        List<OrderColumn> ocList = new ArrayList<OrderColumn>();
-        for (OrderColumn oc : sme.getSortColumns()) {
-            ocList.add(oc);
-        }
-        HashPartitionMergeExchangePOperator hpme = new HashPartitionMergeExchangePOperator(ocList, hpe.getHashFields(),
-                hpe.getDomain());
-        op1.setPhysicalOperator(hpme);
-        op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
-        op1.computeDeliveredPhysicalProperties(context);
-        return true;
-    }
-
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
index 113a06e..f8a93b0 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
@@ -25,7 +25,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,20 +36,21 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
  * replace Subplan operators with nested loop joins where the join condition is true, if the Subplan
  * does not contain free variables (does not have correlations to the input stream).
- * 
+ *
  * @author yingyib
  */
 public class NestedSubplanToJoinRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -103,17 +103,20 @@
                 continue;
             }
 
-            /** expend the input and roots into a DAG of nested loop joins */
+            /**
+             * Expends the input and roots into a DAG of nested loop joins.
+             * Though joins should be left-outer joins, a left-outer join with condition TRUE is equivalent to an inner join.
+             **/
             Mutable<ILogicalExpression> expr = new MutableObject<ILogicalExpression>(ConstantExpression.TRUE);
             Mutable<ILogicalOperator> nestedRootRef = nestedRoots.get(0);
-            ILogicalOperator join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput),
+            ILogicalOperator join = new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput),
                     nestedRootRef);
 
             /** rewrite the nested tuple source to be empty tuple source */
-            rewriteNestedTupleSource(nestedRootRef);
+            rewriteNestedTupleSource(nestedRootRef, context);
 
             for (int i = 1; i < nestedRoots.size(); i++) {
-                join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
+                join = new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
             }
             op1.getInputs().get(index).setValue(join);
             context.computeAndSetTypeEnvironmentForOperator(join);
@@ -124,17 +127,20 @@
 
     /**
      * rewrite NestedTupleSource operators to EmptyTupleSource operators
-     * 
+     *
      * @param nestedRootRef
      */
-    private void rewriteNestedTupleSource(Mutable<ILogicalOperator> nestedRootRef) {
+    private void rewriteNestedTupleSource(Mutable<ILogicalOperator> nestedRootRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator nestedRoot = (AbstractLogicalOperator) nestedRootRef.getValue();
         if (nestedRoot.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            nestedRootRef.setValue(new EmptyTupleSourceOperator());
+            ILogicalOperator ets = new EmptyTupleSourceOperator();
+            nestedRootRef.setValue(ets);
+            context.computeAndSetTypeEnvironmentForOperator(ets);
         }
         List<Mutable<ILogicalOperator>> inputs = nestedRoot.getInputs();
         for (Mutable<ILogicalOperator> input : inputs) {
-            rewriteNestedTupleSource(input);
+            rewriteNestedTupleSource(input, context);
         }
     }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSortDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSortDownRule.java
new file mode 100644
index 0000000..7a7003b
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSortDownRule.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule pushes down the sort operator
+ * as much as possible to where the sort keys are available.
+ * The rule pushes the sort operator down one-step-at-a-time.
+ */
+public class PushSortDownRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator operator = opRef.getValue();
+        if (operator.getOperatorTag() != LogicalOperatorTag.ORDER) {
+            return false;
+        }
+
+        // Gets used variables in the sort operator.
+        OrderOperator orderOperator = (OrderOperator) operator;
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> orderKeys = orderOperator.getOrderExpressions();
+        Set<LogicalVariable> orderUsedVars = new HashSet<LogicalVariable>();
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderKey : orderKeys) {
+            orderKey.second.getValue().getUsedVariables(orderUsedVars);
+        }
+        Mutable<ILogicalOperator> inputOpRef = orderOperator.getInputs().get(0);
+        ILogicalOperator inputOperator = inputOpRef.getValue();
+
+        // Only pushes sort through assign:
+        // 1. Blocking operators like sort/group/join cannot be pushed through.
+        // 2. Data reducing operators like select/project should not be pushed through.
+        // 3. Order-destroying operator like unnest/unnest-map cannot be pushed through.
+        if (inputOperator.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        Set<LogicalVariable> inputProducedVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getProducedVariables(inputOperator, inputProducedVars);
+
+        // Intersects used variables in the sort and variables produced by inputOperator.
+        orderUsedVars.retainAll(inputProducedVars);
+        if (!orderUsedVars.isEmpty()) {
+            // If the sort uses any variable that is produced by this operator.
+            return false;
+        }
+
+        // Switches sort and its input operator.
+        opRef.setValue(inputOperator);
+        inputOpRef.setValue(inputOperator.getInputs().get(0).getValue());
+        inputOperator.getInputs().get(0).setValue(orderOperator);
+
+        // Re-computes the type environments.
+        context.computeAndSetTypeEnvironmentForOperator(orderOperator);
+        context.computeAndSetTypeEnvironmentForOperator(inputOperator);
+        return true;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
new file mode 100644
index 0000000..84d7c9d
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class RemoveUnnecessarySortMergeExchange implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getPhysicalOperator() == null
+                || (op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE && op1
+                        .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE)) {
+            return false;
+        }
+        Mutable<ILogicalOperator> currentOpRef = op1.getInputs().get(0);
+        AbstractLogicalOperator currentOp = (AbstractLogicalOperator) currentOpRef.getValue();
+
+        // Goes down the pipeline to find a qualified SortMergeExchange to eliminate.
+        while (currentOp != null) {
+            IPhysicalOperator physicalOp = currentOp.getPhysicalOperator();
+            if (physicalOp == null) {
+                return false;
+            } else if (physicalOp.getOperatorTag() == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+                break;
+            } else if (!currentOp.isMap() || currentOp.getOperatorTag() == LogicalOperatorTag.UNNEST
+                    || currentOp.getOperatorTag() == LogicalOperatorTag.LIMIT) {
+                // Do not eliminate sort-merge below input order-sensitive operators.
+                // TODO(buyingyi): once Taewoo merges his limit-push down change,
+                // we need to use his new property in logical operator to check order sensitivity.
+                return false;
+            } else if (currentOp.getInputs().size() == 1) {
+                currentOpRef = currentOp.getInputs().get(0);
+                currentOp = (AbstractLogicalOperator) currentOpRef.getValue();
+            } else {
+                currentOp = null;
+            }
+        }
+        if (currentOp == null) {
+            // There is no such qualified SortMergeExchange.
+            return false;
+        }
+
+        if (op1.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE) {
+            // If op1 is a hash_partition_merge_exchange, the sort_merge_exchange can be simply removed.
+            currentOpRef.setValue(currentOp.getInputs().get(0).getValue());
+            op1.computeDeliveredPhysicalProperties(context);
+            return true;
+        }
+
+        // Checks whether sort columns in the SortMergeExchange are still available at op1.
+        // If yes, we use HashMergeExchange; otherwise, we use HashExchange.
+        SortMergeExchangePOperator sme = (SortMergeExchangePOperator) currentOp.getPhysicalOperator();
+        HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
+        Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(op1, liveVars);
+        boolean usingHashMergeExchange = true;
+        for (OrderColumn oc : sme.getSortColumns()) {
+            if (!liveVars.contains(oc.getColumn())) {
+                usingHashMergeExchange = false;
+            }
+        }
+
+        if (usingHashMergeExchange) {
+            // Add sort columns from the SortMergeExchange into a new HashMergeExchange.
+            List<OrderColumn> ocList = new ArrayList<OrderColumn>();
+            for (OrderColumn oc : sme.getSortColumns()) {
+                ocList.add(oc);
+            }
+            HashPartitionMergeExchangePOperator hpme = new HashPartitionMergeExchangePOperator(ocList,
+                    hpe.getHashFields(), hpe.getDomain());
+            op1.setPhysicalOperator(hpme);
+        }
+
+        // Remove the SortMergeExchange op.
+        currentOpRef.setValue(currentOp.getInputs().get(0).getValue());
+
+        // Re-compute delivered properties at op1.
+        op1.computeDeliveredPhysicalProperties(context);
+        return true;
+    }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 134e6fb..50b4ea9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -25,7 +25,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -100,10 +99,11 @@
 
         /** join the two independent branches */
         InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE),
-                new MutableObject<ILogicalOperator>(boundaryOpRef.getValue()), new MutableObject<ILogicalOperator>(
-                        opRef.getValue()));
+                new MutableObject<ILogicalOperator>(boundaryOpRef.getValue()),
+                new MutableObject<ILogicalOperator>(opRef.getValue()));
         opRef.setValue(join);
         ILogicalOperator ets = new EmptyTupleSourceOperator();
+        context.computeAndSetTypeEnvironmentForOperator(ets);
         boundaryOpRef.setValue(ets);
         context.computeAndSetTypeEnvironmentForOperator(boundaryOpRef.getValue());
         context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());