cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk/fullstack@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/pom.xml b/algebricks/algebricks-core/pom.xml
index a74a540..def5b35 100644
--- a/algebricks/algebricks-core/pom.xml
+++ b/algebricks/algebricks-core/pom.xml
@@ -16,8 +16,9 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
+          <fork>true</fork>
         </configuration>
       </plugin>
     </plugins>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 165fccd..6701385 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,4 +89,9 @@
     public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
 
     public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
+    
+    /**
+     * Indicates whether the expressions used by this operator must be variable reference expressions.
+     */
+    public boolean requiresVariableReferenceExpressions();
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 468d25c..0aa1ff6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -51,6 +51,8 @@
      * returns true if op1 and op2 have already been compared
      */
     public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+    
+    public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
 
     public abstract void addNotToBeInlinedVar(LogicalVariable var);
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..b8bdf3e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,6 +20,7 @@
     CLUSTER,
     DATASOURCESCAN,
     DISTINCT,
+    DISTRIBUTE_RESULT,
     GROUP,
     EMPTYTUPLESOURCE,
     EXCHANGE,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..32cfb9a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,6 +7,7 @@
     BTREE_SEARCH,
     STATS,
     DATASOURCE_SCAN,
+    DISTRIBUTE_RESULT,
     EMPTY_TUPLE_SOURCE,
     EXTERNAL_GROUP_BY,
     IN_MEMORY_HASH_JOIN,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
index bf3f82b..b284b22 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
@@ -39,9 +39,11 @@
 
     @Override
     public ScalarFunctionCallExpression cloneExpression() {
-        cloneAnnotations();
         List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
-        return new ScalarFunctionCallExpression(finfo, clonedArgs);
+        ScalarFunctionCallExpression funcExpr = new ScalarFunctionCallExpression(finfo, clonedArgs);
+        funcExpr.getAnnotations().putAll(cloneAnnotations());
+        funcExpr.setOpaqueParameters(this.getOpaqueParameters());
+        return funcExpr;
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
index 71932d8..652f9b0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
@@ -45,6 +45,7 @@
         List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
         UnnestingFunctionCallExpression ufce = new UnnestingFunctionCallExpression(finfo, clonedArgs);
         ufce.setReturnsUniqueValues(returnsUniqueValues);
+        ufce.setOpaqueParameters(this.getOpaqueParameters());
         return ufce;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 899b633..82187e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,6 +51,10 @@
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
             throws AlgebricksException;
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException;
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index dc0edfe..64dbdef 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -182,4 +182,9 @@
         return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
                 ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
     }
+    
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index a4dc2e0..4543997 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -89,4 +89,8 @@
         return env;
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index 3227f3d..3c21c8f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -106,5 +106,4 @@
         }
         return env;
     }
-
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
index 20aa574..03bfcba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
@@ -81,4 +81,8 @@
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index ee0dcf6..b1da831 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;

 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

@@ -49,12 +50,34 @@
 

     @Override

     public void recomputeSchema() {

-        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());

+        if (schema == null) {

+            schema = new ArrayList<LogicalVariable>();

+        }

+        schema.clear();

+        for (Mutable<ILogicalExpression> eRef : expressions) {

+            ILogicalExpression e = eRef.getValue();

+            if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression v = (VariableReferenceExpression) e;

+                schema.add(v.getVariableReference());

+            }

+        }

     }

 

     @Override

     public VariablePropagationPolicy getVariablePropagationPolicy() {

-        return VariablePropagationPolicy.ALL;

+        return new VariablePropagationPolicy() {

+            @Override

+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)

+                    throws AlgebricksException {

+                for (Mutable<ILogicalExpression> eRef : expressions) {

+                    ILogicalExpression e = eRef.getValue();

+                    if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                        VariableReferenceExpression v = (VariableReferenceExpression) e;

+                        target.addVariable(v.getVariableReference());

+                    }

+                }

+            }

+        };

     }

 

     @Override

@@ -105,7 +128,16 @@
 

     @Override

     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {

-        return createPropagatingAllInputsTypeEnvironment(ctx);

+        IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());

+        IVariableTypeEnvironment childEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());

+        for (Mutable<ILogicalExpression> exprRef : expressions) {

+            ILogicalExpression expr = exprRef.getValue();

+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

+                env.setVarType(varRefExpr.getVariableReference(), childEnv.getType(expr));

+            }

+        }

+        return env;

     }

 

 }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
new file mode 100644
index 0000000..6ca6d87
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class DistributeResultOperator extends AbstractLogicalOperator {
+    private List<Mutable<ILogicalExpression>> expressions;
+    private IDataSink dataSink;
+
+    public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
+        this.expressions = expressions;
+        this.dataSink = dataSink;
+    }
+
+    public List<Mutable<ILogicalExpression>> getExpressions() {
+        return expressions;
+    }
+
+    public IDataSink getDataSink() {
+        return dataSink;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.DISTRIBUTE_RESULT;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitDistributeResultOperator(this, arg);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        boolean modif = false;
+        for (int i = 0; i < expressions.size(); i++) {
+            boolean b = visitor.transform(expressions.get(i));
+            if (b) {
+                modif = true;
+            }
+        }
+        return modif;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean isMap() {
+        return false; // actually depends on the physical op.
+    }
+
+    @Override
+    public void recomputeSchema() {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
index 101b6f5..5aa858f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
@@ -51,7 +51,7 @@
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
-        return false;
+        return delegate.acceptExpressionTransform(transform);
     }
 
     @Override
@@ -108,4 +108,13 @@
         return this.createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+    
+    public IOperatorExtension getDelegate() {
+        return delegate;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
index 98c3301..0a80337 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
 
+import java.util.Collection;
 import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -42,5 +43,8 @@
     void setPhysicalOperator(IPhysicalOperator physicalOperator);
 
     ExecutionMode getExecutionMode();
-
+    
+    public void getUsedVariables(Collection<LogicalVariable> usedVars);
+    
+    public void getProducedVariables(Collection<LogicalVariable> producedVars);
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
index 3c6a699..8e67ed9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
@@ -109,5 +109,4 @@
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
index 8c611b0..fda920a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
@@ -102,4 +102,8 @@
         return env;
     }
 
+    @Override
+    public boolean requiresVariableReferenceExpressions() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..1b4be1e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -47,8 +47,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -64,7 +66,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -476,6 +477,14 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
         // propagateFDsAndEquivClasses(op, ctx);
         setEmptyFDsEqClasses(op, ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 31061db..ac6d887 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
@@ -345,7 +346,7 @@
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP)
             return Boolean.FALSE;
-        UnnestOperator unnestOpArg = (UnnestOperator) copyAndSubstituteVar(op, arg);
+        UnnestMapOperator unnestOpArg = (UnnestMapOperator) copyAndSubstituteVar(op, arg);
         boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), unnestOpArg.getVariables());
         if (!isomorphic)
             return Boolean.FALSE;
@@ -425,6 +426,17 @@
     }
 
     @Override
+    public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+            return Boolean.FALSE;
+        DistributeResultOperator writeOpArg = (DistributeResultOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        return isomorphic;
+    }
+
+    @Override
     public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT)
@@ -762,6 +774,14 @@
         }
 
         @Override
+        public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
+                throws AlgebricksException {
+            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            deepCopyExpressionRefs(newExpressions, op.getExpressions());
+            return new DistributeResultOperator(newExpressions, op.getDataSink());
+        }
+
+        @Override
         public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
             ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
@@ -784,8 +804,8 @@
             deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
             List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(((AbstractLogicalExpression)op.getFilterExpression())
-                    .cloneExpression());
+            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
             return new IndexInsertDeleteOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
                     newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
         }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..b9544c7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -37,8 +37,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -54,7 +56,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -230,6 +231,13 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..8f1d686 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -29,8 +29,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -46,7 +48,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -240,6 +241,12 @@
     }
 
     @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 994c6cb..31adcba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -32,8 +32,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,7 +51,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -223,6 +224,11 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         return null;

     }

@@ -249,6 +255,7 @@
 

     @Override

     public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        op.getDelegate().getProducedVariables(producedVariables);

         return null;

     }

 }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 9295179..cd0cee3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -31,8 +31,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -85,7 +86,13 @@
 

     @Override

     public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {

-        standardLayout(op);

+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

+            ILogicalExpression expr = exprRef.getValue();

+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;

+                schemaVariables.add(varRefExpr.getVariableReference());

+            }

+        }

         return null;

     }

 

@@ -232,6 +239,12 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {

+        standardLayout(op);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {

         standardLayout(op);

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..69fb3f8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -33,8 +33,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -51,7 +53,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

@@ -335,6 +336,16 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

+            throws AlgebricksException {

+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {

+            e.getValue().substituteVar(pair.first, pair.second);

+        }

+        substVarTypes(op, pair);

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)

             throws AlgebricksException {

         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3a82ccd..5361a19 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -25,14 +25,17 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,13 +52,17 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

 public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {

@@ -106,8 +113,49 @@
     }

 

     @Override

-    public Void visitExchangeOperator(ExchangeOperator op, Void arg) {

-        // does not use any variable

+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {

+        // Used variables depend on the physical operator.

+        if (op.getPhysicalOperator() != null) {

+            IPhysicalOperator physOp = op.getPhysicalOperator();

+            switch (physOp.getOperatorTag()) {

+                case BROADCAST_EXCHANGE:

+                case ONE_TO_ONE_EXCHANGE:

+                case RANDOM_MERGE_EXCHANGE: {

+                    // No variables used.

+                    break;

+                }

+                case HASH_PARTITION_EXCHANGE: {

+                    HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getHashFields());

+                    break;

+                }

+                case HASH_PARTITION_MERGE_EXCHANGE: {

+                    HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getPartitionFields());

+                    for (OrderColumn orderCol : concreteOp.getOrderColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case SORT_MERGE_EXCHANGE: {

+                    SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;

+                    for (OrderColumn orderCol : concreteOp.getSortColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case RANGE_PARTITION_EXCHANGE: {

+                    RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;

+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {

+                        usedVariables.add(partCol.getColumn());

+                    }

+                    break;

+                }

+                default: {

+                    throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");

+                }

+            }

+        }

         return null;

     }

 

@@ -257,6 +305,14 @@
     }

 

     @Override

+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {

+        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {

+            expr.getValue().getUsedVariables(usedVariables);

+        }

+        return null;

+    }

+

+    @Override

     public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {

         op.getPayloadExpression().getValue().getUsedVariables(usedVariables);

         for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {

@@ -297,6 +353,7 @@
 

     @Override

     public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {

+        op.getDelegate().getUsedVariables(usedVariables);

         return null;

     }

 

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 25f22e7..f66f99b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -61,14 +61,22 @@
             ITypingContext ctx) throws AlgebricksException {

         substituteVariables(op, v1, v2, true, ctx);

     }

-

+    

+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,

+            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {

+        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {

+            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);

+        }

+        substituteVariables(op, v1, v2, true, ctx);

+    }

+    

     public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,

             boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {

         ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(

                 goThroughNts, ctx);

         op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));

     }

-

+    

     public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {

         Set<T> varSet = new HashSet<T>();

         Set<T> varArgSet = new HashSet<T>();

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
new file mode 100644
index 0000000..302d4d2
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class DistributeResultPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.DISTRIBUTE_RESULT;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        DistributeResultOperator write = (DistributeResultOperator) op;
+        IDataSink sink = write.getDataSink();
+        IPartitioningProperty pp = sink.getPartitioningProperty();
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        DistributeResultOperator resultOp = (DistributeResultOperator) op;
+        IMetadataProvider mp = context.getMetadataProvider();
+
+        JobSpecification spec = builder.getJobSpec();
+
+        int[] columns = new int[resultOp.getExpressions().size()];
+        int i = 0;
+        for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new NotImplementedException("Only writing variable expressions is supported.");
+            }
+            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+            LogicalVariable v = varRef.getVariableReference();
+            columns[i++] = inputSchemas[0].findVariable(v);
+        }
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+        IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+                context, columns);
+
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+
+        builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = resultOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, resultOp, 0);
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index f3c9e5a..61d4880 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -158,5 +158,13 @@
                 comparatorFactories);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
+    
+    public List<LogicalVariable> getPartitionFields() {
+        return partitionFields;
+    }
+    
+    public List<OrderColumn> getOrderColumns() {
+        return orderColumns;
+    }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index c737cc4..6da42b4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -32,14 +32,21 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
@@ -90,6 +97,8 @@
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
         IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
                 keysLeftBranch, env, context);
+        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+                keysLeftBranch, env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
         int i = 0;
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -97,33 +106,75 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc = null;
-        try {
-            switch (kind) {
-                case INNER: {
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                            hashFunFactories, comparatorFactories, recDescriptor);
-                    break;
-                }
-                case LEFT_OUTER: {
-                    INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
-                    for (int j = 0; j < nullWriterFactories.length; j++) {
-                        nullWriterFactories[j] = context.getNullWriterFactory();
-                    }
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                            hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
-                    break;
-                }
-                default: {
-                    throw new NotImplementedException();
-                }
+
+        boolean optimizedHashJoin = true;
+        for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+            if (family == null) {
+                optimizedHashJoin = false;
+                break;
             }
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
+        }
+
+        if (!optimizedHashJoin) {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor);
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        } else {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft));
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft), true, nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
         }
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -140,3 +191,72 @@
     }
 
 }
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final IBinaryComparatorFactory[] binaryComparatorFactories;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+            int[] keysRight) {
+        this.binaryComparatorFactories = binaryComparatorFactory;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+        IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+        for (int i = 0; i < binaryComparators.length; i++) {
+            binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+        }
+        return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+    }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+    private final IBinaryComparator[] binaryComparators;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+        this.binaryComparators = bComparator;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < binaryComparators.length; i++) {
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+                    accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 8cbd2d8..d153f90 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -44,6 +44,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -56,8 +57,8 @@
 import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
 
 /**
- * Left input is broadcast and preserves its local properties.
- * Right input can be partitioned in any way.
+ * Left input is broadcast and preserves its local properties. Right input can
+ * be partitioned in any way.
  */
 public class NLJoinPOperator extends AbstractJoinPOperator {
 
@@ -97,7 +98,7 @@
                 pp = pv1.getPartitioningProperty();
             }
         } else {
-        	pp = IPartitioningProperty.UNPARTITIONED;
+            pp = IPartitioningProperty.UNPARTITIONED;
         }
 
         List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
@@ -122,7 +123,8 @@
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
         IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
         conditionInputSchemas[0] = propagatedSchema;
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -135,10 +137,19 @@
 
         switch (kind) {
             case INNER: {
-                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize);
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+                        null);
                 break;
             }
-            case LEFT_OUTER:
+            case LEFT_OUTER: {
+                INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nullWriterFactories.length; j++) {
+                    nullWriterFactories[j] = context.getNullWriterFactory();
+                }
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+                        nullWriterFactories);
+                break;
+            }
             default: {
                 throw new NotImplementedException();
             }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 7e3c935..8875f6c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -69,5 +70,9 @@
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         throw new NotImplementedException();
     }
+    
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..fc0c433 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -30,8 +30,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -48,7 +50,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -164,6 +165,13 @@
     }
 
     @Override
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("distribute result ").append(op.getExpressions());
+        return buffer.toString();
+    }
+
+    @Override
     public String visitWriteResultOperator(WriteResultOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..23dac2a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -20,8 +20,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -37,7 +39,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -97,6 +98,8 @@
 
     public R visitWriteOperator(WriteOperator op, T arg) throws AlgebricksException;
 
+    public R visitDistributeResultOperator(DistributeResultOperator op, T arg) throws AlgebricksException;
+
     public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
 
     public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 22a1a81..365d1a5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -42,151 +43,167 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 
 public class JobGenContext {
-    private final IOperatorSchema outerFlowSchema;
-    private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
-    private final ISerializerDeserializerProvider serializerDeserializerProvider;
-    private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
-    private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
-    private final IPrinterFactoryProvider printerFactoryProvider;
-    private final ITypeTraitProvider typeTraitProvider;
-    private final IMetadataProvider<?, ?> metadataProvider;
-    private final INullWriterFactory nullWriterFactory;
-    private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
-    private final Object appContext;
-    private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
-    private final IBinaryIntegerInspectorFactory integerInspectorFactory;
-    private final IExpressionRuntimeProvider expressionRuntimeProvider;
-    private final IExpressionTypeComputer expressionTypeComputer;
-    private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
-    private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
-    private final int frameSize;
-    private AlgebricksPartitionConstraint clusterLocations;
-    private int varCounter;
-    private final ITypingContext typingContext;
+	private final IOperatorSchema outerFlowSchema;
+	private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+	private final ISerializerDeserializerProvider serializerDeserializerProvider;
+	private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+	private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+	private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+	private final IPrinterFactoryProvider printerFactoryProvider;
+	private final ITypeTraitProvider typeTraitProvider;
+	private final IMetadataProvider<?, ?> metadataProvider;
+	private final INullWriterFactory nullWriterFactory;
+	private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+	private final Object appContext;
+	private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+	private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+	private final IExpressionRuntimeProvider expressionRuntimeProvider;
+	private final IExpressionTypeComputer expressionTypeComputer;
+	private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+	private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+	private final int frameSize;
+	private AlgebricksPartitionConstraint clusterLocations;
+	private int varCounter;
+	private final ITypingContext typingContext;
 
-    public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
-            ISerializerDeserializerProvider serializerDeserializerProvider,
-            IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
-            IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
-            IBinaryBooleanInspectorFactory booleanInspectorFactory,
-            IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
-            INullWriterFactory nullWriterFactory,
-            INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
-            IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
-            INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
-            IExpressionEvalSizeComputer expressionEvalSizeComputer,
-            IPartialAggregationTypeComputer partialAggregationTypeComputer, int frameSize,
-            AlgebricksPartitionConstraint clusterLocations) {
-        this.outerFlowSchema = outerFlowSchema;
-        this.metadataProvider = metadataProvider;
-        this.appContext = appContext;
-        this.serializerDeserializerProvider = serializerDeserializerProvider;
-        this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
-        this.comparatorFactoryProvider = comparatorFactoryProvider;
-        this.typeTraitProvider = typeTraitProvider;
-        this.booleanInspectorFactory = booleanInspectorFactory;
-        this.integerInspectorFactory = integerInspectorFactory;
-        this.printerFactoryProvider = printerFactoryProvider;
-        this.clusterLocations = clusterLocations;
-        this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
-        this.nullWriterFactory = nullWriterFactory;
-        this.expressionRuntimeProvider = expressionRuntimeProvider;
-        this.expressionTypeComputer = expressionTypeComputer;
-        this.typingContext = typingContext;
-        this.expressionEvalSizeComputer = expressionEvalSizeComputer;
-        this.partialAggregationTypeComputer = partialAggregationTypeComputer;
-        this.frameSize = frameSize;
-        this.varCounter = 0;
-    }
+	public JobGenContext(
+			IOperatorSchema outerFlowSchema,
+			IMetadataProvider<?, ?> metadataProvider,
+			Object appContext,
+			ISerializerDeserializerProvider serializerDeserializerProvider,
+			IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+			IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+			IBinaryComparatorFactoryProvider comparatorFactoryProvider,
+			ITypeTraitProvider typeTraitProvider,
+			IBinaryBooleanInspectorFactory booleanInspectorFactory,
+			IBinaryIntegerInspectorFactory integerInspectorFactory,
+			IPrinterFactoryProvider printerFactoryProvider,
+			INullWriterFactory nullWriterFactory,
+			INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+			IExpressionRuntimeProvider expressionRuntimeProvider,
+			IExpressionTypeComputer expressionTypeComputer,
+			INullableTypeComputer nullableTypeComputer,
+			ITypingContext typingContext,
+			IExpressionEvalSizeComputer expressionEvalSizeComputer,
+			IPartialAggregationTypeComputer partialAggregationTypeComputer,
+			int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+		this.outerFlowSchema = outerFlowSchema;
+		this.metadataProvider = metadataProvider;
+		this.appContext = appContext;
+		this.serializerDeserializerProvider = serializerDeserializerProvider;
+		this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+		this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+		this.comparatorFactoryProvider = comparatorFactoryProvider;
+		this.typeTraitProvider = typeTraitProvider;
+		this.booleanInspectorFactory = booleanInspectorFactory;
+		this.integerInspectorFactory = integerInspectorFactory;
+		this.printerFactoryProvider = printerFactoryProvider;
+		this.clusterLocations = clusterLocations;
+		this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+		this.nullWriterFactory = nullWriterFactory;
+		this.expressionRuntimeProvider = expressionRuntimeProvider;
+		this.expressionTypeComputer = expressionTypeComputer;
+		this.typingContext = typingContext;
+		this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+		this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+		this.frameSize = frameSize;
+		this.varCounter = 0;
+	}
 
-    public IOperatorSchema getOuterFlowSchema() {
-        return outerFlowSchema;
-    }
+	public IOperatorSchema getOuterFlowSchema() {
+		return outerFlowSchema;
+	}
 
-    public AlgebricksPartitionConstraint getClusterLocations() {
-        return clusterLocations;
-    }
+	public AlgebricksPartitionConstraint getClusterLocations() {
+		return clusterLocations;
+	}
 
-    public IMetadataProvider<?, ?> getMetadataProvider() {
-        return metadataProvider;
-    }
+	public IMetadataProvider<?, ?> getMetadataProvider() {
+		return metadataProvider;
+	}
 
-    public Object getAppContext() {
-        return appContext;
-    }
+	public Object getAppContext() {
+		return appContext;
+	}
 
-    public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
-        return serializerDeserializerProvider;
-    }
+	public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+		return serializerDeserializerProvider;
+	}
 
-    public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
-        return hashFunctionFactoryProvider;
-    }
+	public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+		return hashFunctionFactoryProvider;
+	}
 
-    public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
-        return comparatorFactoryProvider;
-    }
+	public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+		return hashFunctionFamilyProvider;
+	}
 
-    public ITypeTraitProvider getTypeTraitProvider() {
-        return typeTraitProvider;
-    }
+	public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+		return comparatorFactoryProvider;
+	}
 
-    public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
-        return booleanInspectorFactory;
-    }
+	public ITypeTraitProvider getTypeTraitProvider() {
+		return typeTraitProvider;
+	}
 
-    public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
-        return integerInspectorFactory;
-    }
+	public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+		return booleanInspectorFactory;
+	}
 
-    public IPrinterFactoryProvider getPrinterFactoryProvider() {
-        return printerFactoryProvider;
-    }
+	public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+		return integerInspectorFactory;
+	}
 
-    public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
-        return expressionRuntimeProvider;
-    }
+	public IPrinterFactoryProvider getPrinterFactoryProvider() {
+		return printerFactoryProvider;
+	}
 
-    public IOperatorSchema getSchema(ILogicalOperator op) {
-        return schemaMap.get(op);
-    }
+	public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+		return expressionRuntimeProvider;
+	}
 
-    public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
-        schemaMap.put(op, schema);
-    }
+	public IOperatorSchema getSchema(ILogicalOperator op) {
+		return schemaMap.get(op);
+	}
 
-    public LogicalVariable createNewVar() {
-        varCounter++;
-        LogicalVariable var = new LogicalVariable(-varCounter);
-        return var;
-    }
+	public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+		schemaMap.put(op, schema);
+	}
 
-    public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
-        return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
-    }
+	public LogicalVariable createNewVar() {
+		varCounter++;
+		LogicalVariable var = new LogicalVariable(-varCounter);
+		return var;
+	}
 
-    public INullWriterFactory getNullWriterFactory() {
-        return nullWriterFactory;
-    }
+	public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
+			throws AlgebricksException {
+		return expressionTypeComputer.getType(expr,
+				typingContext.getMetadataProvider(), env);
+	}
 
-    public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
-        return normalizedKeyComputerFactoryProvider;
-    }
+	public INullWriterFactory getNullWriterFactory() {
+		return nullWriterFactory;
+	}
 
-    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
-        return expressionEvalSizeComputer;
-    }
+	public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+		return normalizedKeyComputerFactoryProvider;
+	}
 
-    public int getFrameSize() {
-        return frameSize;
-    }
+	public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+		return expressionEvalSizeComputer;
+	}
 
-    public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
-        return partialAggregationTypeComputer;
-    }
+	public int getFrameSize() {
+		return frameSize;
+	}
 
-    public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
-        return typingContext.getOutputTypeEnvironment(op);
-    }
+	public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+		return partialAggregationTypeComputer;
+	}
+
+	public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+		return typingContext.getOutputTypeEnvironment(op);
+	}
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index 790fb93..530d19c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
 import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -31,6 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -42,8 +44,8 @@
 
     @SuppressWarnings("rawtypes")
     public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
-            JobGenContext context) throws AlgebricksException {        
-		ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+            JobGenContext context) throws AlgebricksException {
+        ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
         ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
         ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
         ITypeTraitProvider ttp = context.getTypeTraitProvider();
@@ -59,7 +61,7 @@
         }
         return new RecordDescriptor(fields, typeTraits);
     }
-    
+
     public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
             JobGenContext context, int[] printColumns) throws AlgebricksException {
         IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
@@ -94,7 +96,20 @@
         }
         return funFactories;
     }
-    
+
+    public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+        int i = 0;
+        IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+        for (LogicalVariable var : varLogical) {
+            Object type = env.getVarType(var);
+            funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+        }
+        return funFamilies;
+    }
+
     public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
             Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
             throws AlgebricksException {
@@ -107,15 +122,14 @@
         }
         return compFactories;
     }
-    
-    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
-            List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
-            throws AlgebricksException {
+
+    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+            int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
         IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
         IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
         for (int i = 0; i < size; i++) {
-                Object type = env.getVarType(varLogical.get(start + i));
-                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+            Object type = env.getVarType(varLogical.get(start + i));
+            compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
         }
         return compFactories;
     }
@@ -144,15 +158,14 @@
         }
         return typeTraits;
     }
-    
-    public static ITypeTraits[] variablesToTypeTraits(
-            List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
-            throws AlgebricksException {
+
+    public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
         ITypeTraits[] typeTraits = new ITypeTraits[size];
         ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
         for (int i = 0; i < size; i++) {
-                Object type = env.getVarType(varLogical.get(start + i));
-                typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+            Object type = env.getVarType(varLogical.get(start + i));
+            typeTraits[i] = typeTraitProvider.getTypeTrait(type);
         }
         return typeTraits;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index bbe8fb3..63a6852 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -54,6 +54,8 @@
         operatorVisitedToParents.clear();
         builder.buildSpec(rootOps);
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        // Do not do activity cluster planning because it is slow on large clusters
+        spec.setUseConnectorPolicyForScheduling(false);
         return spec;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 7c63e01..738fc7f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -135,6 +135,7 @@
     /*
      * returns true if op1 and op2 have already been compared
      */
+    @Override
     public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
         HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
         if (ops == null) {
@@ -151,6 +152,11 @@
             }
         }
     }
+    
+    @Override
+    public void removeFromAlreadyCompared(ILogicalOperator op1) {
+        alreadyCompared.remove(op1);
+    }
 
     public void addNotToBeInlinedVar(LogicalVariable var) {
         notToBeInlinedVars.add(var);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 9ce910b..fc6c198 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -17,8 +17,8 @@
     public PhysicalOptimizationConfig() {
         int frameSize = 32768;
         setInt(FRAMESIZE, frameSize);
-        setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 512 * MB) / frameSize));
-        setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
+        setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+        setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 32 * MB) / frameSize));
 
         // use http://www.rsok.com/~jrm/printprimes.html to find prime numbers
         setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);