GroupJoin: Merged with stabilization rev 1821


git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_groupjoin_integration@1822 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java
new file mode 100644
index 0000000..1cee5ef
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.base;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+
+public interface INestedPlan {
+	
+	public LogicalOperatorTag getOperatorTag();
+	
+    public List<ILogicalPlan> getNestedPlans();
+
+    public boolean hasNestedPlans();
+    
+    public LinkedList<Mutable<ILogicalOperator>> allRootsInReverseOrder();
+
+    //
+    // @Override
+    // public void computeConstraintsAndEquivClasses() {
+    // for (ILogicalPlan p : nestedPlans) {
+    // for (LogicalOperatorReference r : p.getRoots()) {
+    // AbstractLogicalOperator op = (AbstractLogicalOperator) r.getOperator();
+    // equivalenceClasses.putAll(op.getEquivalenceClasses());
+    // functionalDependencies.addAll(op.getFDs());
+    // }
+    // }
+    // }
+
+    public void recomputeSchema();
+
+    public boolean isMap();
+    
+    public void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
+
+    public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..1c9628f 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -23,6 +23,7 @@
     GROUP,
     EMPTYTUPLESOURCE,
     EXCHANGE,
+    GROUPJOIN,
     INNERJOIN,
     LEFTOUTERJOIN,
     LIMIT,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..8349cb2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -10,10 +10,12 @@
     EMPTY_TUPLE_SOURCE,
     EXTERNAL_GROUP_BY,
     IN_MEMORY_HASH_JOIN,
+    IN_MEMORY_HASH_GROUP_JOIN,
     HASH_GROUP_BY,
     HASH_PARTITION_EXCHANGE,
     HASH_PARTITION_MERGE_EXCHANGE,
     HYBRID_HASH_JOIN,
+    HYBRID_HASH_GROUP_JOIN,
     HDFS_READER,
     IN_MEMORY_STABLE_SORT,
     MICRO_PRE_CLUSTERED_GROUP_BY,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
index a720039..45d4be9 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
 
 /**
  * 
@@ -96,6 +97,7 @@
 
     @Override
     public <R, T> R accept(ILogicalExpressionVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(" *** AggregateFunctionCallExpression: isTwoStep = " + isTwoStep());
         return visitor.visitAggregateFunctionCallExpression(this, arg);
     }
 
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
index ea4f3e0..7fb6e68 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
@@ -23,9 +23,10 @@
 
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
-public abstract class AbstractOperatorWithNestedPlans extends AbstractLogicalOperator {
+public abstract class AbstractOperatorWithNestedPlans extends AbstractLogicalOperator implements INestedPlan{
     protected final List<ILogicalPlan> nestedPlans;
 
     public AbstractOperatorWithNestedPlans() {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java
new file mode 100644
index 0000000..6197f03
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java
@@ -0,0 +1,337 @@
+/*

+ * Copyright 2009-2010 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ * 

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;

+

+import java.io.File;

+import java.io.FileWriter;

+import java.io.PrintWriter;

+import java.util.ArrayList;

+import java.util.Collection;

+import java.util.LinkedList;

+import java.util.List;

+

+import org.apache.commons.lang3.mutable.Mutable;

+import org.apache.commons.lang3.mutable.MutableObject;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;

+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;

+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

+

+public class GroupJoinOperator extends AbstractBinaryJoinOperator implements INestedPlan {

+

+    private final List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gByList;

+    private final List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList;

+    private Mutable<List<ILogicalPlan>> nestedPlans;

+    

+    public GroupJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition,

+            Mutable<ILogicalOperator> input1, Mutable<ILogicalOperator> input2) {

+        super(JoinKind.LEFT_OUTER, condition);

+        gByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();

+        decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();

+        this.nestedPlans = new MutableObject<List<ILogicalPlan>>();

+    }

+

+    public GroupJoinOperator(Mutable<ILogicalExpression> condition) {

+        super(JoinKind.LEFT_OUTER, condition);

+        gByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();

+        decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();

+        this.nestedPlans = new MutableObject<List<ILogicalPlan>>();

+    }

+

+    public GroupJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition,

+            Mutable<ILogicalOperator> input1, Mutable<ILogicalOperator> input2,

+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList,

+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList, List<ILogicalPlan> nestedPlans) throws AlgebricksException {

+        super(joinKind, condition, input1, input2);

+        this.gByList = groupByList;

+        this.decorList = decorList;

+        this.nestedPlans = new MutableObject<List<ILogicalPlan>>(nestedPlans);

+    }

+

+    @Override

+    public LogicalOperatorTag getOperatorTag() {

+        return LogicalOperatorTag.GROUPJOIN;

+    }

+

+    public boolean hasNestedPlans(){

+    	return true;

+    }

+    

+    public List<ILogicalPlan> getNestedPlans() {

+        return nestedPlans.getValue();

+    }

+    

+    public void setNestedPlans(List<ILogicalPlan> nPlans) {

+        this.nestedPlans.setValue(nPlans);

+    }

+    

+    public List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> getGroupByList() {

+        return gByList;

+    }

+

+    public List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> getDecorList() {

+        return decorList;

+    }

+

+    public String gByListToString() {

+        return veListToString(gByList);

+    }

+

+    public String decorListToString() {

+        return veListToString(decorList);

+    }

+

+    public List<LogicalVariable> getGbyVarList() {

+        List<LogicalVariable> varList = new ArrayList<LogicalVariable>(gByList.size());

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {

+            ILogicalExpression expr = ve.second.getValue();

+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                VariableReferenceExpression v = (VariableReferenceExpression) expr;

+                varList.add(v.getVariableReference());

+            }

+        }

+        return varList;

+    }

+

+    public static String veListToString(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) {

+        StringBuilder sb = new StringBuilder();

+        sb.append("[");

+        boolean fst = true;

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) {

+            if (fst) {

+                fst = false;

+            } else {

+                sb.append("; ");

+            }

+            if (ve.first != null) {

+                sb.append(ve.first + " := " + ve.second);

+            } else {

+                sb.append(ve.second.getValue());

+            }

+        }

+        sb.append("]");

+        return sb.toString();

+    }

+

+    @Override

+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {

+        return visitor.visitGroupJoinOperator(this, arg);

+    }

+

+    @Override

+    public void recomputeSchema(){

+        schema = new ArrayList<LogicalVariable>();

+//        schema.addAll(inputs.get(0).getValue().getSchema());

+//        schema.addAll(inputs.get(1).getValue().getSchema());

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {

+            if (p.first != null)

+            	schema.add(p.first);

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {

+            schema.add(getDecorVariable(p));

+        }

+        

+        for(ILogicalPlan p : getNestedPlans()){

+        	if (p == null)

+        		continue;

+        	for(Mutable<ILogicalOperator> o : p.getRoots()){

+        		AbstractLogicalOperator nestedOp = (AbstractLogicalOperator)o.getValue();

+        		if (nestedOp != null){

+        			schema.addAll(nestedOp.getSchema());

+        		}

+        	}

+        }

+    }

+

+	@Override

+	public LinkedList<Mutable<ILogicalOperator>> allRootsInReverseOrder() {

+		// TODO Auto-generated method stub

+		return null;

+	}

+

+	@Override

+	public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {

+            if (p.first != null) {

+                vars.add(p.first);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {

+            if (p.first != null) {

+                vars.add(p.first);

+            }

+        }

+	}

+

+	@Override

+	public void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : gByList) {

+            g.second.getValue().getUsedVariables(vars);

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : decorList) {

+            g.second.getValue().getUsedVariables(vars);

+        }

+        getCondition().getValue().getUsedVariables(vars);

+	}

+	

+    @Override

+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {

+        ILogicalOperator child1 = inputs.get(0).getValue();

+        ILogicalOperator child2 = inputs.get(1).getValue();

+        IVariableTypeEnvironment env1 = ctx.getOutputTypeEnvironment(child1);

+        IVariableTypeEnvironment env2 = ctx.getOutputTypeEnvironment(child2);

+        int n = 0;

+        for (ILogicalPlan p : getNestedPlans()) {

+            n += p.getRoots().size();

+        }

+        ITypeEnvPointer[] envPointers = new ITypeEnvPointer[n];

+        int i = 0;

+        for (ILogicalPlan p : getNestedPlans()) {

+            for (Mutable<ILogicalOperator> r : p.getRoots()) {

+//                ctx.setOutputTypeEnvironment(r.getValue(), env1);

+                envPointers[i] = new OpRefTypeEnvPointer(r, ctx);

+                i++;

+            }

+        }

+        

+        IVariableTypeEnvironment env;

+        

+        if(getJoinKind() == JoinKind.LEFT_OUTER){

+        	env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),

+                ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.LEFT_OUTER, envPointers);

+        }

+        else

+        	env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),

+                    ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);

+        

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : getGroupByList()) {

+            ILogicalExpression expr = p.second.getValue();

+            if (p.first != null) {

+                env.setVarType(p.first, env1.getType(expr));

+                if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                    LogicalVariable v1 = ((VariableReferenceExpression) expr).getVariableReference();

+                    env.setVarType(v1, env1.getVarType(v1));

+                }

+            } else {

+                VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();

+                LogicalVariable v2 = vre.getVariableReference();

+                env.setVarType(v2, env1.getVarType(v2));

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : getDecorList()) {

+            ILogicalExpression expr = p.second.getValue();

+            if (p.first != null) {

+                env.setVarType(p.first, env1.getType(expr));

+            } else {

+                VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();

+                LogicalVariable v2 = vre.getVariableReference();

+                env.setVarType(v2, env1.getVarType(v2));

+            }

+        }

+        return env;

+    }

+

+	@Override

+    public VariablePropagationPolicy getVariablePropagationPolicy() {

+		VariablePropagationPolicy groupJoinVarPropPolicy = new VariablePropagationPolicy() {

+

+            @Override

+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)

+                    throws AlgebricksException {

+                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {

+                    ILogicalExpression expr = p.second.getValue();

+                    if (p.first != null) {

+                        target.addVariable(p.first);

+                    } else {

+                        if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {

+                            throw new AlgebricksException("GroupJoin expects variable references.");

+                        }

+                        VariableReferenceExpression v = (VariableReferenceExpression) expr;

+                        target.addVariable(v.getVariableReference());

+                    }

+                }

+                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {

+                    ILogicalExpression expr = p.second.getValue();

+                    if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {

+                        throw new AlgebricksException("GroupJoin expects variable references.");

+                    }

+                    VariableReferenceExpression v = (VariableReferenceExpression) expr;

+                    LogicalVariable decor = v.getVariableReference();

+                    if (p.first != null) {

+                        target.addVariable(p.first);

+                    } else {

+                        target.addVariable(decor);

+                    }

+                }

+            }

+        };

+        

+        for(ILogicalPlan p : getNestedPlans()){

+        	if (p == null)

+        		continue;

+        	for(Mutable<ILogicalOperator> o : p.getRoots()){

+        		AbstractLogicalOperator nestedOp = (AbstractLogicalOperator)o.getValue();

+        		if (nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE)

+        			groupJoinVarPropPolicy = VariablePropagationPolicy.concat(groupJoinVarPropPolicy, nestedOp.getVariablePropagationPolicy());

+        	}

+        }

+        

+        return groupJoinVarPropPolicy;

+    }

+

+    @Override

+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {

+        boolean b = false;

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {

+            if (visitor.transform(p.second)) {

+                b = true;

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {

+            if (visitor.transform(p.second)) {

+                b = true;

+            }

+        }

+        return b;

+    }

+

+    public static LogicalVariable getDecorVariable(Pair<LogicalVariable, Mutable<ILogicalExpression>> p) {

+        if (p.first != null) {

+            return p.first;

+        } else {

+            VariableReferenceExpression e = (VariableReferenceExpression) p.second.getValue();

+            return e.getVariableReference();

+        }

+    }

+

+

+}

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
index a9a8f55..1c0cf22 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
@@ -42,6 +42,10 @@
         return dataSourceReference.getValue().getInputs().get(0).getValue();
     }
 
+    public ILogicalOperator getSourceOperator(int inputIdx) {
+        return dataSourceReference.getValue().getInputs().get(inputIdx).getValue();
+    }
+    
     @Override
     public LogicalOperatorTag getOperatorTag() {
         return LogicalOperatorTag.NESTEDTUPLESOURCE;
@@ -88,8 +92,11 @@
 
             @Override
             public IVariableTypeEnvironment getTypeEnv() {
-                ILogicalOperator op = dataSourceReference.getValue().getInputs().get(0).getValue();
-                return ctx.getOutputTypeEnvironment(op);
+                AbstractLogicalOperator dsr = (AbstractLogicalOperator)dataSourceReference.getValue();
+            	if(dsr.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+            		return ctx.getOutputTypeEnvironment(dsr.getInputs().get(1).getValue());
+            	else
+            		return ctx.getOutputTypeEnvironment(dsr.getInputs().get(0).getValue());
             }
         };
         return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..943a323 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -305,6 +306,147 @@
     }
 
     @Override
+    public Void visitGroupJoinOperator(GroupJoinOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+        ctx.putFDList(op, functionalDependencies);
+
+        List<FunctionalDependency> inheritedFDs = new ArrayList<FunctionalDependency>();
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                ILogicalOperator op2 = r.getValue();
+                equivalenceClasses.putAll(getOrComputeEqClasses(op2, ctx));
+                inheritedFDs.addAll(getOrComputeFDs(op2, ctx));
+            }
+        }
+        
+        ILogicalOperator opLeft = op.getInputs().get(0).getValue();
+        inheritedFDs.addAll(getOrComputeFDs(opLeft, ctx));
+        ILogicalOperator opRight = op.getInputs().get(1).getValue();
+        inheritedFDs.addAll(getOrComputeFDs(opRight, ctx));
+
+        Map<LogicalVariable, EquivalenceClass> inheritedEcs = new HashMap<LogicalVariable, EquivalenceClass>();
+        inheritedEcs.putAll(getOrComputeEqClasses(opLeft, ctx));
+        inheritedEcs.putAll(getOrComputeEqClasses(opRight, ctx));
+        
+        ILogicalExpression joinExpr = op.getCondition().getValue();
+        
+        // Join part of GroupJoin
+        switch(op.getJoinKind()){
+        	case INNER:
+        		joinExpr.getConstraintsAndEquivClasses(functionalDependencies, equivalenceClasses);
+        		break;
+        	case LEFT_OUTER:
+        		Collection<LogicalVariable> leftSideVars;
+        		if (opLeft.getSchema() == null) {
+        			leftSideVars = new LinkedList<LogicalVariable>();
+        			VariableUtilities.getLiveVariables(opLeft, leftSideVars);
+        			// actually, not all produced vars. are visible (due to projection)
+        			// so using cached schema is better and faster
+        		} else {
+        			leftSideVars = opLeft.getSchema();
+        		}
+        		joinExpr.getConstraintsForOuterJoin(functionalDependencies, leftSideVars);
+        		break;
+        	default:
+        		throw new NotImplementedException();
+        }
+
+        // Group part of GroupJoin
+        for (FunctionalDependency inherited : inheritedFDs) {
+            boolean isCoveredByGbyOrDecorVars = true;
+            List<LogicalVariable> newHead = new ArrayList<LogicalVariable>(inherited.getHead().size());
+            for (LogicalVariable v : inherited.getHead()) {
+                LogicalVariable vnew = getNewGbyVar(op, v);
+                if (vnew == null) {
+                    vnew = getNewDecorVar(op, v);
+                    if (vnew == null) {
+                        isCoveredByGbyOrDecorVars = false;
+                    }
+                    break;
+                }
+                newHead.add(vnew);
+            }
+
+            if (isCoveredByGbyOrDecorVars) {
+                List<LogicalVariable> newTail = new ArrayList<LogicalVariable>();
+                for (LogicalVariable v2 : inherited.getTail()) {
+                    LogicalVariable v3 = getNewGbyVar(op, v2);
+                    if (v3 != null) {
+                        newTail.add(v3);
+                    }
+                }
+                if (!newTail.isEmpty()) {
+                    FunctionalDependency newFd = new FunctionalDependency(newHead, newTail);
+                    functionalDependencies.add(newFd);
+                }
+            }
+        }
+
+        List<LogicalVariable> premiseGby = new LinkedList<LogicalVariable>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gByList = op.getGroupByList();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            premiseGby.add(p.first);
+        }
+
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = op.getDecorList();
+
+        LinkedList<LogicalVariable> conclDecor = new LinkedList<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+            conclDecor.add(GroupJoinOperator.getDecorVariable(p));
+        }
+        if (!conclDecor.isEmpty()) {
+            functionalDependencies.add(new FunctionalDependency(premiseGby, conclDecor));
+        }
+
+        Set<LogicalVariable> gbySet = new HashSet<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression v = (VariableReferenceExpression) expr;
+                gbySet.add(v.getVariableReference());
+            }
+        }
+        LocalGroupingProperty lgp = new LocalGroupingProperty(gbySet);
+        lgp.normalizeGroupingColumns(inheritedEcs, inheritedFDs);
+        Set<LogicalVariable> normSet = lgp.getColumnSet();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> newGbyList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        boolean changed = false;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                LogicalVariable v2 = varRef.getVariableReference();
+                EquivalenceClass ec2 = inheritedEcs.get(v2);
+                LogicalVariable v3;
+                if (ec2 != null && !ec2.representativeIsConst()) {
+                    v3 = ec2.getVariableRepresentative();
+                } else {
+                    v3 = v2;
+                }
+                if (normSet.contains(v3)) {
+                    newGbyList.add(p);
+                } else {
+                    changed = true;
+                    decorList.add(p);
+                }
+            } else {
+                newGbyList.add(p);
+            }
+        }
+        if (changed) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Group-by list changed from "
+                    + GroupJoinOperator.veListToString(gByList) + " to " + GroupJoinOperator.veListToString(newGbyList)
+                    + ".\n");
+        }
+        gByList.clear();
+        gByList.addAll(newGbyList);
+       return null;
+    }
+    
+    @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext ctx) throws AlgebricksException {
         Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
         List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
@@ -612,8 +754,16 @@
         ctx.putFDList(op, fds);
     }
 
-    private LogicalVariable getNewGbyVar(GroupByOperator g, LogicalVariable v) {
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getGroupByList()) {
+    private LogicalVariable getNewGbyVar(AbstractLogicalOperator op, LogicalVariable v) {
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList;
+    	if(op.getOperatorTag() == LogicalOperatorTag.GROUP)
+    		groupByList = ((GroupByOperator)op).getGroupByList();
+    	else if(op.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+    		groupByList = ((GroupJoinOperator)op).getGroupByList();
+    	else
+    		return null;
+
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : groupByList) {
             ILogicalExpression e = p.second.getValue();
             if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                 LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
@@ -625,8 +775,16 @@
         return null;
     }
 
-    private LogicalVariable getNewDecorVar(GroupByOperator g, LogicalVariable v) {
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
+    private LogicalVariable getNewDecorVar(AbstractLogicalOperator op, LogicalVariable v) {
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList;
+    	if(op.getOperatorTag() == LogicalOperatorTag.GROUP)
+    		decorList = ((GroupByOperator)op).getDecorList();
+    	else if(op.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+    		decorList = ((GroupJoinOperator)op).getDecorList();
+    	else
+    		return null;
+
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
             ILogicalExpression e = p.second.getValue();
             if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                 LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 31061db..d5b577a 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -33,6 +33,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -42,6 +43,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -49,6 +51,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -189,6 +192,17 @@
     }
 
     @Override
+    public Boolean visitGroupJoinOperator(GroupJoinOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.GROUPJOIN)
+            return Boolean.FALSE;
+        GroupJoinOperator joinOpArg = (GroupJoinOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getCondition().getValue().equals(joinOpArg.getCondition().getValue());
+        return isomorphic;
+    }
+    
+    @Override
     public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
@@ -636,6 +650,27 @@
         }
 
         @Override
+        public ILogicalOperator visitGroupJoinOperator(GroupJoinOperator op, Void arg)
+                throws AlgebricksException {
+        	
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+            ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList())
+                groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+                        deepCopyExpressionRef(pair.second)));
+            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList())
+                decorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+                        deepCopyExpressionRef(pair.second)));
+            for (ILogicalPlan plan : op.getNestedPlans()) {
+                newSubplans.add(IsomorphismOperatorVisitor.deepCopy(plan));
+            }
+            
+            return new GroupJoinOperator(op.getJoinKind(), deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
+                    .getInputs().get(1), groupByList, decorList, newSubplans);
+        }
+        
+        @Override
         public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
             return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
                     .getInputs().get(1));
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..bd27939 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -28,10 +28,11 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractAssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -40,6 +41,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -96,7 +98,7 @@
     @Override
     public Void visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapChildren(op, arg);
-        mapVariablesForGroupBy(op, arg);
+        mapVariablesForGroupBy(op, arg, LogicalOperatorTag.GROUP);
         mapVariablesInNestedPlans(op, arg);
         return null;
     }
@@ -114,6 +116,14 @@
     }
 
     @Override
+    public Void visitGroupJoinOperator(GroupJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        mapVariablesForGroupBy(op, arg, LogicalOperatorTag.GROUPJOIN);
+        mapVariablesInNestedPlans(op, arg);
+        return null;
+    }
+    
+    @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
@@ -297,15 +307,27 @@
                 rightOp.getExpressions());
     }
 
-    private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right) throws AlgebricksException {
-        GroupByOperator leftOp = (GroupByOperator) left;
-        GroupByOperator rightOp = (GroupByOperator) right;
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> leftPairs = leftOp.getGroupByList();
-        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rightPairs = rightOp.getGroupByList();
-        mapVarExprPairList(leftPairs, rightPairs);
-        leftPairs = leftOp.getDecorList();
-        rightPairs = rightOp.getDecorList();
-        mapVarExprPairList(leftPairs, rightPairs);
+    private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right, LogicalOperatorTag opTag) throws AlgebricksException {
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> lGByList;
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rGByList;
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> lDecorList;
+    	List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rDecorList;
+    	if(opTag == LogicalOperatorTag.GROUP){
+    		lGByList = ((GroupByOperator) left).getGroupByList();
+    		rGByList = ((GroupByOperator) right).getGroupByList();
+    		lDecorList = ((GroupByOperator) left).getDecorList();
+    		rDecorList = ((GroupByOperator) right).getDecorList();
+    	}
+        else if(opTag == LogicalOperatorTag.GROUPJOIN){
+    		lGByList = ((GroupJoinOperator) left).getGroupByList();
+    		rGByList = ((GroupJoinOperator) right).getGroupByList();
+    		lDecorList = ((GroupJoinOperator) left).getDecorList();
+    		rDecorList = ((GroupJoinOperator) right).getDecorList();
+    	}
+        else
+        	return;
+        mapVarExprPairList(lGByList, rGByList);
+        mapVarExprPairList(lDecorList, rDecorList);
     }
 
     private void mapVarExprPairList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> leftPairs,
@@ -349,8 +371,8 @@
     }
 
     private void mapVariablesInNestedPlans(ILogicalOperator opOrigin, ILogicalOperator arg) throws AlgebricksException {
-        AbstractOperatorWithNestedPlans op = (AbstractOperatorWithNestedPlans) opOrigin;
-        AbstractOperatorWithNestedPlans argOp = (AbstractOperatorWithNestedPlans) arg;
+        INestedPlan op = (INestedPlan) opOrigin;
+        INestedPlan argOp = (INestedPlan) arg;
         List<ILogicalPlan> plans = op.getNestedPlans();
         List<ILogicalPlan> plansArg = argOp.getNestedPlans();
         if (plans.size() != plansArg.size())
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..36ea800 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -122,6 +123,13 @@
     }
 
     @Override
+    public Void visitGroupJoinOperator(GroupJoinOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    
+    @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 994c6cb..406a4d7 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;

@@ -120,6 +121,26 @@
     }

 

     @Override

+    public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {

+        for (ILogicalPlan p : op.getNestedPlans()) {

+            for (Mutable<ILogicalOperator> r : p.getRoots()) {

+                VariableUtilities.getProducedVariables(r.getValue(), producedVariables);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getGroupByList()) {

+            if (p.first != null) {

+                producedVariables.add(p.first);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getDecorList()) {

+            if (p.first != null) {

+                producedVariables.add(p.first);

+            }

+        }

+        return null;

+    }

+    

+    @Override

     public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {

         return null;

     }

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 9295179..b72e427 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -24,8 +24,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

@@ -34,6 +36,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;

@@ -127,6 +130,32 @@
     }

 

     @Override

+    public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {

+        standardLayout(op);

+        for (ILogicalPlan p : op.getNestedPlans()) {

+            for (Mutable<ILogicalOperator> r : p.getRoots()) {

+                VariableUtilities.getLiveVariables(r.getValue(), schemaVariables);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getGroupByList()) {

+            if (p.first != null) {

+                schemaVariables.add(p.first);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getDecorList()) {

+            if (p.first != null) {

+                schemaVariables.add(p.first);

+            } else {

+                ILogicalExpression e = p.second.getValue();

+                if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {

+                    schemaVariables.add(((VariableReferenceExpression) e).getVariableReference());

+                }

+            }

+        }

+        return null;

+    }

+    

+    @Override

     public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {

         standardLayout(op);

         return null;

@@ -152,7 +181,12 @@
 

     @Override

     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {

-        VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);

+    	AbstractLogicalOperator sourceOp= (AbstractLogicalOperator)op.getDataSourceReference().getValue();

+    	if(sourceOp.getOperatorTag() == LogicalOperatorTag.GROUPJOIN){

+            VariableUtilities.getLiveVariables(op.getSourceOperator(1), schemaVariables);

+    	}

+    	else

+    		VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);

         return null;

     }

 

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..c8d81ca 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;

@@ -156,6 +157,22 @@
     }

 

     @Override

+    public Void visitGroupJoinOperator(GroupJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)

+            throws AlgebricksException {

+        subst(pair.first, pair.second, op.getGroupByList());

+        subst(pair.first, pair.second, op.getDecorList());

+        for (ILogicalPlan p : op.getNestedPlans()) {

+            for (Mutable<ILogicalOperator> r : p.getRoots()) {

+                OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,

+                        pair.second, goThroughNts, ctx);

+            }

+        }

+        op.getCondition().getValue().substituteVar(pair.first, pair.second);

+        substVarTypes(op, pair);

+        return null;

+    }

+    

+    @Override

     public Void visitInnerJoinOperator(InnerJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)

             throws AlgebricksException {

         op.getCondition().getValue().substituteVar(pair.first, pair.second);

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 56487cc..250dbcb 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;

@@ -125,6 +126,23 @@
     }

 

     @Override

+    public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {

+        for (ILogicalPlan p : op.getNestedPlans()) {

+            for (Mutable<ILogicalOperator> r : p.getRoots()) {

+                VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);

+            }

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getGroupByList()) {

+            g.second.getValue().getUsedVariables(usedVariables);

+        }

+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getDecorList()) {

+            g.second.getValue().getUsedVariables(usedVariables);

+        }

+        op.getCondition().getValue().getUsedVariables(usedVariables);

+        return null;

+    }

+

+    @Override

     public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) {

         op.getCondition().getValue().getUsedVariables(usedVariables);

         return null;

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index c5f4c71..20141d2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -88,7 +89,7 @@
     }
 
     protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context)
+            INestedPlan npOp, IOperatorSchema opSchema, JobGenContext context)
             throws AlgebricksException {
         AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
         PlanCompiler pc = new PlanCompiler(context);
@@ -100,7 +101,7 @@
     }
 
     private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
+            INestedPlan npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots are not supported.");
         }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java
new file mode 100644
index 0000000..105d104
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashGroupJoinOperatorDescriptor;
+
+public class HybridHashGroupJoinPOperator extends AbstractHashJoinPOperator {
+
+    private final int memSizeInFrames;
+    private final int maxInputBuildSizeInFrames;
+    private final int aveRecordsPerFrame;
+    private final double fudgeFactor;
+
+    public HybridHashGroupJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
+            int memSizeInFrames, int maxInputSize0InFrames, int aveRecordsPerFrame, double fudgeFactor) {
+        super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+        this.memSizeInFrames = memSizeInFrames;
+        this.maxInputBuildSizeInFrames = maxInputSize0InFrames;
+        this.aveRecordsPerFrame = aveRecordsPerFrame;
+        this.fudgeFactor = fudgeFactor;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.HYBRID_HASH_GROUP_JOIN;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    public double getFudgeFactor() {
+        return fudgeFactor;
+    }
+
+    public int getMemSizeInFrames() {
+        return memSizeInFrames;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        GroupJoinOperator opLogical = (GroupJoinOperator) op;
+        int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+        int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        
+        INullWriterFactory[] nullWriterFactories;
+        switch (kind) {
+        case INNER: {
+        	nullWriterFactories = null;
+            break;
+        	}
+        case LEFT_OUTER: {
+            Collection<LogicalVariable> nestedProdVars = new HashSet<LogicalVariable>();
+            for(ILogicalPlan p : opLogical.getNestedPlans()){
+            	VariableUtilities.getProducedVariables(p.getRoots().get(0).getValue(), nestedProdVars);
+            }
+            nullWriterFactories = new INullWriterFactory[nestedProdVars.size()];
+            for (int j = 0; j < nullWriterFactories.length; j++) {
+                nullWriterFactories[j] = context.getNullWriterFactory();
+            }
+            break;
+        	}
+        default: {
+            throw new NotImplementedException();
+        	}
+        }
+
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+                keysLeftBranch, env, context);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+        int i = 0;
+        IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+        for (LogicalVariable v : keysLeftBranch) {
+            Object t = env.getVarType(v);
+            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        }
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+        
+        int numDecors = opLogical.getDecorList().size();
+        int[] decorKeys = new int[numDecors];
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : opLogical.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("groupjoin expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            decorKeys[j++] = inputSchemas[0].findVariable(decor);
+        }
+        
+        int n = 0;
+        AggregateOperator aggOp;
+        List<AggregateOperator> aggOpList = new LinkedList<AggregateOperator>();
+        AbstractLogicalOperator nestedOp;
+
+        for(Mutable<ILogicalOperator> nOp : opLogical.getNestedPlans().get(0).getRoots()) {
+        	nestedOp = (AbstractLogicalOperator) nOp.getValue();
+        	if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                aggOp = (AggregateOperator) nestedOp;
+                n += aggOp.getExpressions().size();
+                aggOpList.add(aggOp);
+        	}
+        }
+        
+        ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+//        ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+        IVariableTypeEnvironment aggOpInputEnv;
+//        IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+        compileSubplans(inputSchemas[1], opLogical, propagatedSchema, context);
+        i=0;
+        for(AggregateOperator a : aggOpList) {
+        	aggOpInputEnv = context.getTypeEnvironment(a.getInputs().get(0).getValue());
+            for (Mutable<ILogicalExpression> exprRef : a.getExpressions()) {
+                AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+                aff[i++] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+                        context);
+//                intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+//                        context.getMetadataProvider()));
+            }
+        }
+        IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
+        
+        try{
+        opDesc = new HybridHashGroupJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames, aveRecordsPerFrame,
+                fudgeFactor, keysLeft, keysRight, decorKeys, hashFunFactories, comparatorFactories, comparatorFactories,
+                /* aggregatorFactory */ aggregatorFactory, recDescriptor, opLogical.getJoinKind() == JoinKind.LEFT_OUTER,
+                nullWriterFactories);
+        } catch(HyracksDataException e){
+        	throw new AlgebricksException(e.getMessage());
+        }
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+        if (lp0 != null) {
+            // maintains the local properties on the probe side
+            return new LinkedList<ILocalStructuralProperty>(lp0);
+        }
+        return new LinkedList<ILocalStructuralProperty>();
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java
new file mode 100644
index 0000000..ccbbe0b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashGroupJoinOperatorDescriptor;
+
+public class InMemoryHashGroupJoinPOperator extends AbstractHashJoinPOperator {
+
+    private final int tableSize;
+
+    /**
+     * builds on the first operator and probes on the second.
+     */
+
+    public InMemoryHashGroupJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+        super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+        this.tableSize = tableSize;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.IN_MEMORY_HASH_GROUP_JOIN;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        GroupJoinOperator opLogical = (GroupJoinOperator) op;
+        int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+        int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        
+        INullWriterFactory[] nullWriterFactories;
+        switch (kind) {
+        case INNER: {
+        	nullWriterFactories = null;
+            break;
+        	}
+        case LEFT_OUTER: {
+            Collection<LogicalVariable> nestedProdVars = new HashSet<LogicalVariable>();
+            for(ILogicalPlan p : opLogical.getNestedPlans()){
+            	VariableUtilities.getProducedVariables(p.getRoots().get(0).getValue(), nestedProdVars);
+            }
+            nullWriterFactories = new INullWriterFactory[nestedProdVars.size()];
+            for (int j = 0; j < nullWriterFactories.length; j++) {
+                nullWriterFactories[j] = context.getNullWriterFactory();
+            }
+            break;
+        	}
+        default: {
+            throw new NotImplementedException();
+        	}
+        }
+
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+                keysLeftBranch, env, context);
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+        int i = 0;
+        IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+        for (LogicalVariable v : keysLeftBranch) {
+            Object t = env.getVarType(v);
+            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        }
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+        
+        int numDecors = opLogical.getDecorList().size();
+        int[] decorKeys = new int[numDecors];
+        int j = 0;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : opLogical.getDecorList()) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new AlgebricksException("groupjoin expects variable references.");
+            }
+            VariableReferenceExpression v = (VariableReferenceExpression) expr;
+            LogicalVariable decor = v.getVariableReference();
+            decorKeys[j++] = inputSchemas[0].findVariable(decor);
+        }
+
+/*        if (opLogical.getNestedPlans().size() != 1) {
+            throw new AlgebricksException(
+                    "External group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        ILogicalPlan p0 = opLogical.getNestedPlans().get(0);
+        if (p0.getRoots().size() != 1) {
+            throw new AlgebricksException(
+                    "External group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+*/
+        
+        int n = 0;
+        AggregateOperator aggOp;
+        List<AggregateOperator> aggOpList = new LinkedList<AggregateOperator>();
+        AbstractLogicalOperator nestedOp;
+
+        for(Mutable<ILogicalOperator> nOp : opLogical.getNestedPlans().get(0).getRoots()) {
+        	nestedOp = (AbstractLogicalOperator) nOp.getValue();
+        	if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                aggOp = (AggregateOperator) nestedOp;
+                n += aggOp.getExpressions().size();
+                aggOpList.add(aggOp);
+        	}
+        }
+        
+        ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+//        ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+        IVariableTypeEnvironment aggOpInputEnv;
+//        IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+        compileSubplans(inputSchemas[1], opLogical, propagatedSchema, context);
+        i=0;
+        for(AggregateOperator a : aggOpList) {
+        	aggOpInputEnv = context.getTypeEnvironment(a.getInputs().get(0).getValue());
+            for (Mutable<ILogicalExpression> exprRef : a.getExpressions()) {
+                AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+                aff[i++] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+                        context);
+//                intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+//                        context.getMetadataProvider()));
+            }
+        }
+        IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
+        opDesc = new InMemoryHashGroupJoinOperatorDescriptor(spec, keysLeft, keysRight, decorKeys, hashFunFactories,
+        		comparatorFactories, comparatorFactories, /* aggregatorFactory */ aggregatorFactory, recDescriptor,
+        		opLogical.getJoinKind() == JoinKind.LEFT_OUTER, nullWriterFactories, tableSize);
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    @Override
+    protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+        if (lp0 != null) {
+            // maintains the local properties on the probe side
+            return new LinkedList<ILocalStructuralProperty>(lp0);
+        }
+        return new LinkedList<ILocalStructuralProperty>();
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..7ab457c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -23,7 +24,10 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -33,6 +37,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -55,6 +60,9 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
@@ -104,6 +112,15 @@
     }
 
     @Override
+    public String visitGroupJoinOperator(GroupJoinOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("group join (").append(op.getCondition().getValue()).append(") gby (")
+        .append(op.gByListToString()).append(") decor (").append(op.decorListToString()).append(") {");
+        printNestedPlans(op, indent, buffer);
+        return buffer.toString();
+    }
+    
+    @Override
     public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("join (").append(op.getCondition().getValue()).append(")");
@@ -268,7 +285,7 @@
         return buffer;
     }
 
-    private void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
+    private void printNestedPlans(INestedPlan op, Integer indent, StringBuilder buffer)
             throws AlgebricksException {
         boolean first = true;
         if (op.getNestedPlans().isEmpty()) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
index aa96513..7265626 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;

@@ -61,7 +62,7 @@
         pad(out, indent);

         appendln(out, "-- " + pOp.toString() + "  |" + op.getExecutionMode() + "|");

         if (op.hasNestedPlans()) {

-            AbstractOperatorWithNestedPlans opNest = (AbstractOperatorWithNestedPlans) op;

+            INestedPlan opNest = (INestedPlan) op;

             for (ILogicalPlan p : opNest.getNestedPlans()) {

                 pad(out, indent + 8);

                 appendln(out, "{");

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..d4ee3be 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -22,11 +22,11 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -148,7 +148,7 @@
             }
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans aonp = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan aonp = (INestedPlan) op;
             for (ILogicalPlan p : aonp.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> ref : p.getRoots()) {
                     AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index cd8f042..05d8432 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -83,7 +84,7 @@
         }
 
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans s = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan s = (INestedPlan) op;
             for (ILogicalPlan p : s.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) r.getValue(), freeVars);
@@ -99,7 +100,7 @@
         }
     }
 
-    public static void getFreeVariablesInSubplans(AbstractOperatorWithNestedPlans op, Set<LogicalVariable> freeVars)
+    public static void getFreeVariablesInSubplans(INestedPlan op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
         for (ILogicalPlan p : op.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
@@ -127,7 +128,7 @@
                 computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) i.getValue(), context);
             }
             if (op.hasNestedPlans()) {
-                AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+                INestedPlan a = (INestedPlan) op;
                 for (ILogicalPlan p : a.getNestedPlans()) {
                     for (Mutable<ILogicalOperator> r : p.getRoots()) {
                         computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) r.getValue(), context);
@@ -145,7 +146,7 @@
                 computeSchemaRecIfNull((AbstractLogicalOperator) i.getValue());
             }
             if (op.hasNestedPlans()) {
-                AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+                INestedPlan a = (INestedPlan) op;
                 for (ILogicalPlan p : a.getNestedPlans()) {
                     for (Mutable<ILogicalOperator> r : p.getRoots()) {
                         computeSchemaRecIfNull((AbstractLogicalOperator) r.getValue());
@@ -195,7 +196,7 @@
             typeOpRec(i, context);
         }
         if (op.hasNestedPlans()) {
-            for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+            for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
                 typePlan(p, context);
             }
         }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..6f4ab15 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -57,6 +58,8 @@
 
     public R visitLimitOperator(LimitOperator op, T arg) throws AlgebricksException;
 
+    public R visitGroupJoinOperator(GroupJoinOperator op, T arg) throws AlgebricksException;
+    
     public R visitInnerJoinOperator(InnerJoinOperator op, T arg) throws AlgebricksException;
 
     public R visitLeftOuterJoinOperator(LeftOuterJoinOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 3382c6e..251439d 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -102,7 +103,7 @@
         }
 
         if (op.hasNestedPlans() && enterNestedPlans) {
-            AbstractOperatorWithNestedPlans o2 = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan o2 = (INestedPlan) op;
             for (ILogicalPlan p : o2.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     if (rewriteOperatorRef(r, rule, enterNestedPlans, fullDFS)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index e6b296b..7611987 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -22,6 +23,7 @@
             PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
             PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
             PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+            PhysicalOperatorTag.HYBRID_HASH_GROUP_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_GROUP_JOIN,
             PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
             PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.SPLIT, PhysicalOperatorTag.STABLE_SORT,
             PhysicalOperatorTag.UNION_ALL };
@@ -92,7 +94,7 @@
             computeSchemaBottomUpForOp((AbstractLogicalOperator) i.getValue());
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan a = (INestedPlan) op;
             for (ILogicalPlan p : a.getNestedPlans()) {
                 computeSchemaBottomUpForPlan(p);
             }
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index de2e6af..2954233 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -16,6 +16,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -233,7 +234,7 @@
         }
 
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan nested = (INestedPlan) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
                 if (physOptimizePlan(p, required, true, context)) {
                     changed = true;
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 8a79d81..a31d7dd 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -113,7 +114,7 @@
             ++cnt;
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan n = (INestedPlan) op;
             List<EquivalenceClass> eqc = equivClasses;
             if (n.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
                 eqc = new LinkedList<EquivalenceClass>();
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index f223844..5829ecb 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -146,7 +147,7 @@
             }
         }
         if (op2.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op2;
+            INestedPlan n = (INestedPlan) op2;
             for (ILogicalPlan p : n.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     if (pushNeededProjections(toPush, r, context, initialOp)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
index e18a380..6dd045a 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
@@ -5,6 +5,7 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -39,7 +40,7 @@
             typeOpRec(i, context);
         }
         if (op.hasNestedPlans()) {
-            for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+            for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
                 typePlan(p, context);
             }
         }
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index c53ea0a..24dbad9 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -74,7 +75,7 @@
             removeUnusedAssigns(cRef, toRemove, context);
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans opWithNest = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan opWithNest = (INestedPlan) op;
             Iterator<ILogicalPlan> planIter = opWithNest.getNestedPlans().iterator();
             while (planIter.hasNext()) {
                 ILogicalPlan p = planIter.next();
@@ -135,7 +136,7 @@
             collectUnusedAssignedVars((AbstractLogicalOperator) c.getValue(), toRemove, false, context);
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans opWithNested = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan opWithNested = (INestedPlan) op;
             for (ILogicalPlan plan : opWithNested.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : plan.getRoots()) {
                     collectUnusedAssignedVars((AbstractLogicalOperator) r.getValue(), toRemove, false, context);
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java
new file mode 100644
index 0000000..4755347
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class ReplaceJoinGroupWithGroupJoinRule implements IAlgebraicRewriteRule {
+	Logger LOGGER = AlgebricksConfig.ALGEBRICKS_LOGGER;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+    	try{
+    	// Check if first op is GROUP
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        LOGGER.finest("GROUPJOIN_REPLACE: " + op.getOperatorTag().toString() + " " + op.hashCode());
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+        	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not Group");
+            return false;
+        }
+        
+        // Check if Group-By contains Aggregate Op
+        GroupByOperator gby = (GroupByOperator) op;
+        for(ILogicalPlan p : gby.getNestedPlans()){
+        	for(Mutable<ILogicalOperator> o : p.getRoots()){
+        		AbstractLogicalOperator nestedOp = (AbstractLogicalOperator) o.getValue();
+        		boolean isOpAggOrNTS = false;
+        		if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE){
+        			AggregateOperator aggOp = (AggregateOperator)nestedOp;
+        	        if (aggOp.getVariables().size() < 1) {
+        	        	LOGGER.finest("GROUPJOIN_REPLACE: " + "No Agg variables!");
+        	            return false;
+        	        }
+    	            List<Mutable<ILogicalExpression>> expressions = aggOp.getExpressions();
+    	            for (int i = 0; i < expressions.size(); i++) {
+    	                AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expressions.get(i).getValue();
+    	                if (aggFun.isTwoStep()){
+    	                	LOGGER.finest("GROUPJOIN_REPLACE: " + "TwoStep Agg");
+    	                	//return false;
+    	                }
+        	        }
+        			isOpAggOrNTS = true;
+        		}
+        		if(nestedOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE || nestedOp.getOperatorTag() == LogicalOperatorTag.ASSIGN)
+        			isOpAggOrNTS = true;
+        		
+        		if(!isOpAggOrNTS)
+        			return false;
+        	}
+        	
+        }
+        
+    	// Check if child op is a JOIN
+        Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+        AbstractLogicalOperator childOfOp = (AbstractLogicalOperator) opRef2.getValue();
+        LOGGER.finest("GROUPJOIN_REPLACE: " + childOfOp.getOperatorTag().toString() + " " + childOfOp.hashCode());
+    	
+    	if (childOfOp.getOperatorTag() == LogicalOperatorTag.ASSIGN){
+            opRef2 = childOfOp.getInputs().get(0);
+            childOfOp = (AbstractLogicalOperator) opRef2.getValue();
+            LOGGER.finest("GROUPJOIN_REPLACE: " + childOfOp.getOperatorTag().toString() + " " + childOfOp.hashCode());
+    	}
+    		
+        if (childOfOp.getOperatorTag() != LogicalOperatorTag.INNERJOIN
+                && childOfOp.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+        	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not Join");
+            return false;
+        }
+        
+        
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) opRef2.getValue();
+        Mutable<ILogicalOperator> joinBranchLeftRef = join.getInputs().get(0);
+        Mutable<ILogicalOperator> joinBranchRightRef = join.getInputs().get(1);
+        Collection<LogicalVariable> joinBranchLeftVars = new HashSet<LogicalVariable>();
+        Collection<LogicalVariable> joinBranchRightVars = new HashSet<LogicalVariable>();
+        List<ILogicalExpression> exprs = new LinkedList<ILogicalExpression>();
+        exprs.add(join.getCondition().getValue());
+        VariableUtilities.getLiveVariables(joinBranchRightRef.getValue(), joinBranchRightVars);
+
+    	// Check if join condition is EQ
+        ILogicalExpression e;
+        AbstractFunctionCallExpression fexp;
+        FunctionIdentifier fi;
+        ComparisonKind ck;
+        while(!exprs.isEmpty()) {
+	        e = exprs.get(0);
+        	switch (e.getExpressionTag()) {
+	        case FUNCTION_CALL: {
+	            fexp = (AbstractFunctionCallExpression) e;
+	            fi = fexp.getFunctionIdentifier();
+	        	
+	            if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
+	                for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
+	                	exprs.add(a.getValue());
+	                }
+	            }
+	            else {
+	                ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
+	                if (ck != ComparisonKind.EQ) {
+	                	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin1");
+	                    return false;
+	                }
+	                ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
+	                ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+	                if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
+	                        || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+	                	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin2");
+	                    return false;
+	                }
+/*	                LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();
+	                if(inLeftAll == null)
+	                	throw new AlgebricksException("inLeftAll is null");
+	                if (inLeftAll.contains(var1)) {
+	                    if (!outLeftFields.contains(var1))
+	                    	outLeftFields.add(var1);
+	                } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {
+	                    outRightFields.add(var1);
+	                } else {
+	                    return false;
+	                }
+	                LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();
+	                if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) {
+	                    outLeftFields.add(var2);
+	                } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) {
+	                    outRightFields.add(var2);
+	                } else {
+	                    return false;
+	                }
+	                return true;
+*/
+	                }
+            	exprs.remove(0);
+                break;
+	            }
+	        default: {
+	        	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin3");
+	        	return false;
+	        	}
+	        }
+        }
+
+        // check: 1) PK(or FD on PK) used in join; 2) if Group-By vars are in join expression
+        Collection<LogicalVariable> joinVars = new HashSet<LogicalVariable>();
+        List<LogicalVariable> groupVars = gby.getGbyVarList();
+        join.getCondition().getValue().getUsedVariables(joinVars);
+        
+        joinVars.removeAll(joinBranchRightVars);
+         // 1. PK
+        boolean pkExists = false;
+        for (LogicalVariable v : joinVars) {
+        	if(context.findPrimaryKey(v) != null) {
+        		pkExists = true;
+        		break;
+        	}
+        }
+        if (!pkExists){
+        	LOGGER.finest("GROUPJOIN_REPLACE: " + "No PK");
+//        	return false;
+        }
+  	
+    	 // 2. Group-By vars
+        // check groupVars.size = joinVars.size
+        if (groupVars.size() != joinVars.size()){
+        	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 1");
+        	return false;
+        }
+        //List
+        List<FunctionalDependency> gByFDList = null;
+        for (LogicalVariable v : groupVars){
+        	if (!joinVars.remove(v)){
+        		LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 2");
+        		LOGGER.finest("GROUPJOIN_REPLACE: " + v.getId());
+            	if(context.findPrimaryKey(v) == null)
+            		return false;
+        	}
+        }
+        if (!joinVars.isEmpty()){
+        	LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 3");
+        	return false;
+        }
+        // check if Aggregate vars are in join right branch
+    	Collection<LogicalVariable> aggrVariables = new HashSet<LogicalVariable>();
+        for (ILogicalPlan p : gby.getNestedPlans()){
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                AbstractLogicalOperator op1Abs = (AbstractLogicalOperator) r.getValue();
+                if (op1Abs.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                	AggregateOperator op1 = (AggregateOperator) op1Abs;
+                	//Collection<LogicalVariable> usedVariables = new HashSet<LogicalVariable>();
+                	VariableUtilities.getUsedVariables(op1, aggrVariables);
+                }
+            }
+        }
+    	if (!joinBranchRightVars.containsAll(aggrVariables)){
+    		LOGGER.finest("GROUPJOIN_REPLACE: " + "Not AggrVar in Right branch");
+    		return false;
+    	}
+        // create GroupJoin op and replace in plan
+        // make same as merge of Join and Group constructors
+        
+        List<ILogicalPlan> nestedPlans = gby.getNestedPlans();
+        context.computeAndSetTypeEnvironmentForOperator(joinBranchLeftRef.getValue());
+        context.computeAndSetTypeEnvironmentForOperator(joinBranchRightRef.getValue());
+        GroupJoinOperator groupJoinOp = new GroupJoinOperator(join.getJoinKind(), join.getCondition(), joinBranchLeftRef, joinBranchRightRef,
+        		gby.getGroupByList(), gby.getDecorList(), nestedPlans);
+        boolean foundNTS = true;
+        for(ILogicalPlan p : nestedPlans){
+        	Mutable<ILogicalOperator> oRef = p.getRoots().get(0);
+        	AbstractLogicalOperator absOp = (AbstractLogicalOperator)oRef.getValue();
+        	foundNTS &= updateNTSInNestedPlan(absOp, groupJoinOp, context);
+/*        	if(absOp.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE){
+        		absOp = (AbstractLogicalOperator)absOp.getInputs().get(0).getValue();
+        	}
+        	else{
+                NestedTupleSourceOperator nts = (NestedTupleSourceOperator)absOp;
+                nts.getDataSourceReference().setValue(groupJoinOp);
+                context.computeAndSetTypeEnvironmentForOperator(nts);
+                foundNTS = true;
+                AggregateOperator agg = (AggregateOperator)absOp;
+                List<Mutable<ILogicalOperator>> aggInpList = agg.getInputs();
+                aggInpList.clear();
+                aggInpList.add(new MutableObject<ILogicalOperator>(nts));
+                ILogicalPlan np1 = new ALogicalPlanImpl(oRef);
+                nestedPlans.add(np1);
+                context.computeAndSetTypeEnvironmentForOperator(agg);
+                OperatorPropertiesUtil.typeOpRec(oRef, context);
+        	}
+*/        }
+        if(!foundNTS)
+        	return false;
+       
+        groupJoinOp.setNestedPlans(nestedPlans);
+        
+        // context.addToDontApplySet(new IntroduceCombinerRule(), groupJoinOp);
+        context.computeAndSetTypeEnvironmentForOperator(groupJoinOp);
+        
+        LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedPlans size = " + groupJoinOp.getNestedPlans().size());
+        
+        for (ILogicalPlan p : groupJoinOp.getNestedPlans()){
+            LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedPlan root size = " + p.getRoots().size());
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                AbstractLogicalOperator op1Abs = (AbstractLogicalOperator) r.getValue();
+                if (op1Abs != null) {
+                    LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedOp = " + op1Abs.getOperatorTag().name());
+                	Collection<LogicalVariable> vars = new HashSet<LogicalVariable>();
+                	VariableUtilities.getLiveVariables(op1Abs, vars);
+                	for(LogicalVariable v : vars){
+                        LOGGER.finest("\t** GROUPJOIN_REPLACE: var = " + v.getId());
+                	}
+                	for(Mutable<ILogicalOperator> mo: op1Abs.getInputs()){
+                        AbstractLogicalOperator childAbs = (AbstractLogicalOperator) mo.getValue();
+                        LOGGER.finest("\t** GROUPJOIN_REPLACE: childOp = " + childAbs.getOperatorTag().name());
+                    	Collection<LogicalVariable> childVars = new HashSet<LogicalVariable>();
+                    	VariableUtilities.getLiveVariables(childAbs, childVars);
+                    	for(LogicalVariable v : vars){
+                            LOGGER.finest("\t\t** GROUPJOIN_REPLACE: var = " + v.getId());
+                    	}
+                	}
+                }
+            }
+        }
+
+        opRef.setValue(groupJoinOp);
+
+        LOGGER.finest("GROUPJOIN_REPLACE: " + "GroupJoin match");
+        return true;
+    	} catch (Exception e){
+    		LOGGER.finest("GROUPJOIN_REPLACE: Error = " + e.getMessage());
+    		return false;
+    	}
+    }
+    
+    private boolean updateNTSInNestedPlan(AbstractLogicalOperator op,
+    		GroupJoinOperator gjOp, IOptimizationContext context) throws AlgebricksException{
+    	LOGGER.finest("GROUPJOIN_REPLACE: " + "NestedPlan \t" + op.getOperatorTag().toString());
+    	if(op.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE){
+    		boolean foundNTS = updateNTSInNestedPlan((AbstractLogicalOperator)op.getInputs().get(0).getValue(), gjOp, context);
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            return foundNTS;
+    	}
+    	else{
+            NestedTupleSourceOperator nts = (NestedTupleSourceOperator)op;
+            nts.getDataSourceReference().setValue(gjOp);
+            context.computeAndSetTypeEnvironmentForOperator(nts);
+            return true;
+        	}
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 38cf96e..e9a1034 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -12,6 +12,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -27,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -63,6 +65,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.rewriter.util.GroupJoinUtils;
 import edu.uci.ics.hyracks.algebricks.rewriter.util.JoinUtils;
 
 public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule {
@@ -163,6 +166,11 @@
                     }
                     break;
                 }
+                case GROUPJOIN: {
+                    GroupJoinUtils.setJoinAlgorithmAndExchangeAlgo((GroupJoinOperator) op, context);
+//                	System.exit(0);
+                	break;
+                }
                 case INNERJOIN: {
                     JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
                     break;
@@ -282,7 +290,7 @@
             }
         }
         if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+            INestedPlan nested = (INestedPlan) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
                 setPhysicalOperators(p, false, context);
             }
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java
new file mode 100644
index 0000000..beccf45
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java
@@ -0,0 +1,218 @@
+/*

+ * Copyright 2009-2010 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ * 

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package edu.uci.ics.hyracks.algebricks.rewriter.util;

+

+import java.util.ArrayList;

+import java.util.Collection;

+import java.util.LinkedList;

+import java.util.List;

+

+import org.apache.commons.lang3.mutable.Mutable;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalPropertiesVisitor;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HybridHashGroupJoinPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashGroupJoinPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;

+

+public class GroupJoinUtils {

+

+    private final static int MB = 1048576;

+

+    private final static double DEFAULT_FUDGE_FACTOR = 1.3;

+    private final static int MAX_RECORDS_PER_FRAME = 512;

+    private final static int DEFAULT_FRAME_SIZE = 32768;

+    private final static int MAX_LEFT_INPUT_SIZE_HYBRID_HASH = (int) (140L * 1024 * MB / DEFAULT_FRAME_SIZE);

+    private final static int DEFAULT_MEMORY_SIZE_HYBRID_HASH = (int) (256L * MB / DEFAULT_FRAME_SIZE);

+

+    public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)

+            throws AlgebricksException {

+        List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();

+        List<LogicalVariable> sideRight = new LinkedList<LogicalVariable>();

+        List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();

+        List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();

+        if (isHashJoinCondition(op.getCondition().getValue(), varsLeft, varsRight, sideLeft, sideRight)) {

+            BroadcastSide side = getBroadcastJoinSide(op.getCondition().getValue(), varsLeft, varsRight);

+            if (side == null) {

+                setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);

+            } else {

+                switch (side) {

+                    case RIGHT:

+                        setHashGroupJoinOp(op, JoinPartitioningType.BROADCAST, sideLeft, sideRight, context);

+                        break;

+                    case LEFT:

+                        //Mutable<ILogicalOperator> opRef0 = op.getInputs().get(0);

+                        //Mutable<ILogicalOperator> opRef1 = op.getInputs().get(1);

+                        //ILogicalOperator tmp = opRef0.getValue();

+                        //opRef0.setValue(opRef1.getValue());

+                        //opRef1.setValue(tmp);

+                        setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);

+                        break;

+                    default:

+                        setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);

+                }

+            }

+        } else {

+            throw new AlgebricksException("Not a hash join: Group Join is not suitable");

+        }

+    }

+

+    private static void setHashGroupJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,

+            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)

+            throws AlgebricksException {

+    	op.setPhysicalOperator(new HybridHashGroupJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,

+                DEFAULT_MEMORY_SIZE_HYBRID_HASH, MAX_LEFT_INPUT_SIZE_HYBRID_HASH, MAX_RECORDS_PER_FRAME,

+                DEFAULT_FUDGE_FACTOR));

+        if (partitioningType == JoinPartitioningType.BROADCAST) {

+            hybridToInMemHashGroupJoin(op, context);

+        }

+        // op.setPhysicalOperator(new

+        // InMemoryHashJoinPOperator(op.getJoinKind(), partitioningType,

+        // sideLeft, sideRight,

+        // 1024 * 512));

+    }

+

+    private static void hybridToInMemHashGroupJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)

+            throws AlgebricksException {

+        ILogicalOperator opBuild = op.getInputs().get(0).getValue();

+        LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);

+        ILogicalPropertiesVector v = context.getLogicalPropertiesVector(opBuild);

+        AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashGroupJoin inner branch -- Logical properties for " + opBuild

+                + ": " + v + "\n");

+        if (v != null) {

+            int size2 = v.getMaxOutputFrames();

+            // Yingyi - Need a way to calculate memory taken by aggregators. For now, it is set as a factor = 1

+            int aggregatorSize = 1;

+            HybridHashGroupJoinPOperator hhgj = (HybridHashGroupJoinPOperator) op.getPhysicalOperator();

+            if (size2 > 0 && size2 * hhgj.getFudgeFactor() * aggregatorSize <= hhgj.getMemSizeInFrames()) {

+                AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashGroupJoin inner branch " + opBuild

+                        + " fits in memory\n");

+                // maintains the local properties on the probe side

+                op.setPhysicalOperator(new InMemoryHashGroupJoinPOperator(hhgj.getKind(), hhgj.getPartitioningType(), hhgj

+                        .getKeysLeftBranch(), hhgj.getKeysRightBranch(), v.getNumberOfTuples() * 2));

+            }

+        }

+

+    }

+

+    public static boolean isHashJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll,

+            Collection<LogicalVariable> inRightAll, Collection<LogicalVariable> outLeftFields,

+            Collection<LogicalVariable> outRightFields) {

+        switch (e.getExpressionTag()) {

+            case FUNCTION_CALL: {

+                AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;

+                FunctionIdentifier fi = fexp.getFunctionIdentifier();

+                if (fi.equals(AlgebricksBuiltinFunctions.AND)) {

+                    for (Mutable<ILogicalExpression> a : fexp.getArguments()) {

+                        if (!isHashJoinCondition(a.getValue(), inLeftAll, inRightAll, outLeftFields,

+                                outRightFields)) {

+                            return false;

+                        }

+                    }

+                    return true;

+                } else {

+                    ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(fi);

+                    if (ck != ComparisonKind.EQ) {

+                        return false;

+                    }

+                    ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();

+                    ILogicalExpression opRight = fexp.getArguments().get(1).getValue();

+                    if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE

+                            || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {

+                        return false;

+                    }

+                    LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();

+                    if (inLeftAll.contains(var1) && !outLeftFields.contains(var1)) {

+                        outLeftFields.add(var1);

+                    } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {

+                        outRightFields.add(var1);

+                    } else {

+                        return false;

+                    }

+                    LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();

+                    if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) {

+                        outLeftFields.add(var2);

+                    } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) {

+                        outRightFields.add(var2);

+                    } else {

+                        return false;

+                    }

+                    return true;

+                }

+            }

+            default: {

+                return false;

+            }

+        }

+    }

+

+    public static BroadcastSide getBroadcastJoinSide(ILogicalExpression e, List<LogicalVariable> varsLeft,

+            List<LogicalVariable> varsRight) {

+        if (e.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {

+            return null;

+        }

+        AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;

+        IExpressionAnnotation ann = fexp.getAnnotations().get(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY);

+        if (ann == null) {

+            return null;

+        }

+        BroadcastSide side = (BroadcastSide) ann.getObject();

+        if (side == null) {

+            return null;

+        }

+        int i;

+        switch (side) {

+            case LEFT:

+                i = 0;

+                break;

+            case RIGHT:

+                i = 1;

+                break;

+            default:

+                return null;

+        }

+        ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();

+        fexp.getArguments().get(i).getValue().getUsedVariables(vars);

+        if (varsLeft.containsAll(vars)) {

+            return BroadcastSide.LEFT;

+        } else if (varsRight.containsAll(vars)) {

+            return BroadcastSide.RIGHT;

+        } else {

+            return null;

+        }

+    }

+}

diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
index d7ebfb4..9f0ff4f 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -32,7 +33,7 @@
             computeFDsAndEqClassesWithVisitorRec((AbstractLogicalOperator) i.getValue(), ctx, visitor, visitSet);
         }
         if (op.hasNestedPlans()) {
-            for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+            for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     AbstractLogicalOperator rootOp = (AbstractLogicalOperator) r.getValue();
                     computeFDsAndEqClassesWithVisitorRec(rootOp, ctx, visitor, visitSet);
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 4d5d3d6..6ca4b1b 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -47,7 +47,6 @@
             public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
                     throws HyracksDataException {
                 DataOutput output = tb.getDataOutput();
-                ftr.reset(accessor, tIndex);
                 for (int i = 0; i < aggs.length; i++) {
                     try {
                         int begin = tb.getSize();
@@ -62,17 +61,19 @@
                     }
                 }
 
-                // doing initial aggregate
-                ftr.reset(accessor, tIndex);
-                for (int i = 0; i < aggs.length; i++) {
-                    try {
-                        byte[] data = tb.getByteArray();
-                        int prevFieldPos = i + keys.length - 1;
-                        int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
-                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
-                    } catch (AlgebricksException e) {
-                        throw new HyracksDataException(e);
-                    }
+                // doing initial aggregate only is accessor is not null
+                if(accessor != null) {
+	                ftr.reset(accessor, tIndex);
+	                for (int i = 0; i < aggs.length; i++) {
+	                    try {
+	                        byte[] data = tb.getByteArray();
+	                        int prevFieldPos = i + keys.length - 1;
+	                        int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+	                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+	                    } catch (AlgebricksException e) {
+	                        throw new HyracksDataException(e);
+	                    }
+	                }
                 }
             }
 
@@ -144,4 +145,4 @@
 
         };
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java
new file mode 100644
index 0000000..7858316
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java
@@ -0,0 +1,396 @@
+/*

+ * Copyright 2009-2010 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ * 

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package edu.uci.ics.hyracks.dataflow.std.join;

+

+import java.io.DataOutput;

+import java.io.IOException;

+import java.nio.ByteBuffer;

+import java.util.ArrayList;

+

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;

+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;

+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;

+

+public class GroupJoinHelper{

+	

+    private final IHyracksTaskContext ctx;

+

+    private final ArrayList<ByteBuffer> buffers;

+    private final ArrayList<ByteBuffer> stateBuffers;

+    /**

+     * Aggregate states: a list of states for all groups maintained in the main

+     * memory.

+     */

+    private int lastBIndex;

+    private int lastStateBIndex;

+    private final int[] storedKeys;

+    private final int[] keys;

+    private final int[] decors;

+    private final int[] keysAndDecors;

+    private final IBinaryComparator[] comparators;

+    private final ITuplePartitionComputer tpc;

+    private final IAggregatorDescriptor aggregator;

+    private final FrameTupleAppender appender, stateAppender;

+    private final FrameTupleAccessor stateAccessor, storedKeysAccessor;

+    private final ArrayTupleBuilder groupTupleBuilder, stateTupleBuilder, outputTupleBuilder, nullTupleBuilder;

+    private final ITuplePartitionComputer tpc1;

+    protected final FrameTuplePairComparator ftpc1;

+    private final int tableSize;

+    private final ISerializableTable hashTable;

+    private final boolean isLeftOuter;

+    private final INullWriter[] nullWriters;

+    private AggregateState state;

+    

+    public GroupJoinHelper(IHyracksTaskContext ctx, int[] gFields, int[] jFields, int[] dFields,

+    		IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory gByTpc0,

+    		ITuplePartitionComputerFactory gByTpc1, IAggregatorDescriptorFactory aggregatorFactory,

+    		RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean isLeftOuter,

+    		INullWriter[] nullWriters1, int tableSize) throws HyracksDataException {

+    	

+    	this.ctx = ctx;

+    	

+        buffers = new ArrayList<ByteBuffer>();

+        stateBuffers = new ArrayList<ByteBuffer>();

+

+        this.keys = gFields;

+        this.storedKeys = new int[keys.length];

+        this.decors = dFields;

+        this.keysAndDecors = new int[keys.length + decors.length];

+        

+        @SuppressWarnings("rawtypes")

+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keys.length + decors.length + 2];

+        for (int i = 0; i < keys.length; ++i) {

+            storedKeys[i] = i;

+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keys[i]];

+            keysAndDecors[i] = i;

+        }

+        for (int i = 0; i < decors.length; ++i) {

+            keysAndDecors[keys.length + i] = keys.length + i;

+            storedKeySerDeser[keys.length + i] = inRecordDescriptor.getFields()[decors[i]];

+        }

+        storedKeySerDeser[keys.length + decors.length] = IntegerSerializerDeserializer.INSTANCE;

+        storedKeySerDeser[keys.length + decors.length + 1] = IntegerSerializerDeserializer.INSTANCE;

+

+        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);

+        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);

+        

+        comparators = new IBinaryComparator[comparatorFactories.length];

+        for (int i = 0; i < comparatorFactories.length; ++i) {

+            comparators[i] = comparatorFactories[i].createBinaryComparator();

+        }

+        tpc = gByTpc0.createPartitioner();

+

+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keys, storedKeys);

+        

+        stateAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);

+

+        lastBIndex = -1;

+        lastStateBIndex = -1;

+

+        appender = new FrameTupleAppender(ctx.getFrameSize());

+        stateAppender = new FrameTupleAppender(ctx.getFrameSize());

+

+        addNewBuffer(false);

+        addNewBuffer(true);

+

+        groupTupleBuilder = new ArrayTupleBuilder(keysAndDecors.length + 2);

+        stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length - decors.length);

+        outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);

+

+        tpc1 = gByTpc1.createPartitioner();

+        ftpc1 = new FrameTuplePairComparator(storedKeys, jFields, comparators);

+        this.tableSize = tableSize;

+        

+        this.isLeftOuter = isLeftOuter;

+        this.nullWriters = nullWriters1;

+        if (isLeftOuter) {

+            nullTupleBuilder = new ArrayTupleBuilder(nullWriters1.length);

+            DataOutput out = nullTupleBuilder.getDataOutput();

+            for (int i = 0; i < nullWriters1.length; i++) {

+                nullWriters1[i].writeNull(out);

+                nullTupleBuilder.addFieldEndOffset();

+            }

+        } else {

+            nullTupleBuilder = null;

+        }

+

+        

+        hashTable = new SerializableHashTable(tableSize, ctx);

+    }

+

+    private void addNewBuffer(boolean isStateBuffer) {

+        ByteBuffer buffer = ctx.allocateFrame();

+        buffer.position(0);

+        buffer.limit(buffer.capacity());

+        if (!isStateBuffer) {

+        	buffers.add(buffer);

+        	appender.reset(buffer, true);

+        	++lastBIndex;

+        } else {

+        	stateBuffers.add(buffer);

+        	stateAppender.reset(buffer, true);

+        	++lastStateBIndex;

+        }

+    }

+    

+    private void updateAggregateIndex(FrameTupleAccessor accessor0, TuplePointer tp1, TuplePointer tp2) {

+    	

+            int tStart = accessor0.getTupleStartOffset(tp1.tupleIndex);

+            int fStartOffset = accessor0.getFieldSlotsLength() + tStart;

+

+            int fieldCount = accessor0.getFieldCount();

+            int fStart = accessor0.getFieldStartOffset(tp1.tupleIndex, fieldCount - 2);

+            

+            accessor0.getBuffer().putInt(fStart + fStartOffset, tp2.frameIndex);

+            fStart = accessor0.getFieldStartOffset(tp1.tupleIndex, fieldCount - 1);

+            accessor0.getBuffer().putInt(fStart + fStartOffset, tp2.tupleIndex);

+    }

+

+    private TuplePointer getAggregateIndex(FrameTupleAccessor accessor0, int tIndex) {

+    	

+    	TuplePointer tp = new TuplePointer();

+    	

+        int tStart = accessor0.getTupleStartOffset(tIndex);

+        int fStartOffset = accessor0.getFieldSlotsLength() + tStart;

+        int fieldCount = accessor0.getFieldCount();

+        int fStart = accessor0.getFieldStartOffset(tIndex, fieldCount - 2);

+

+    	tp.frameIndex = accessor0.getBuffer().getInt(fStart + fStartOffset);

+        fStart = accessor0.getFieldStartOffset(tIndex, fieldCount - 1);

+    	tp.tupleIndex = accessor0.getBuffer().getInt(fStart + fStartOffset);

+        return tp;

+    }

+    

+    public void getField(FrameTupleAccessor accessor, int tIndex, int fIndex) {

+        int startOffset = accessor.getTupleStartOffset(tIndex);

+        int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);

+        int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;

+        System.out.print(" |");

+        for (int i=startOffset + accessor.getFieldSlotsLength() + fStartOffset; i < startOffset + accessor.getFieldSlotsLength() + fStartOffset + fLen; i++)

+        	System.out.print(" " + accessor.getBuffer().get(i));

+    }

+

+

+    public void build(FrameTupleAccessor accessor, ByteBuffer buffer) throws HyracksDataException, IOException {

+        accessor.reset(buffer);

+        int tCount = accessor.getTupleCount();

+        int entry;

+        TuplePointer storedTuplePointer = new TuplePointer();

+

+        for (int tIndex = 0; tIndex < tCount; ++tIndex) {

+        	entry = tpc.partition(accessor, tIndex, tableSize);

+        	

+            groupTupleBuilder.reset();

+//            System.out.print("\nBTuple : " + entry);

+            for (int k = 0; k < keys.length; k++) {

+//                getField(accessor, tIndex, keys[k]);

+                groupTupleBuilder.addField(accessor, tIndex, keys[k]);

+            }

+            for (int d = 0; d < decors.length; d++) {

+//                getField(accessor, tIndex, decors[d]);

+                groupTupleBuilder.addField(accessor, tIndex, decors[d]);

+            }

+            groupTupleBuilder.getDataOutput().writeInt(-1);

+            groupTupleBuilder.addFieldEndOffset();

+            groupTupleBuilder.getDataOutput().writeInt(-1);

+            groupTupleBuilder.addFieldEndOffset();

+

+            if (!appender.appendSkipEmptyField(groupTupleBuilder.getFieldEndOffsets(),

+                    groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {

+                addNewBuffer(false);

+                if (!appender.appendSkipEmptyField(groupTupleBuilder.getFieldEndOffsets(),

+                        groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {

+                    throw new HyracksDataException("Cannot init the aggregate state in a single frame.");

+                }

+            }

+

+            storedTuplePointer.frameIndex = lastBIndex;

+            storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;

+            hashTable.insert(entry, storedTuplePointer);

+        }

+    }

+    

+	public void insert(FrameTupleAccessor accessor, ByteBuffer buffer) throws HyracksDataException {

+        accessor.reset(buffer);

+        int tupleCount0 = accessor.getTupleCount();

+        int entry, offset;

+        boolean foundGroup = false;

+        TuplePointer storedTuplePointer = new TuplePointer();

+        TuplePointer stateTuplePointer = new TuplePointer();

+    	state = aggregator.createAggregateStates();

+        

+        for (int tIndex = 0; tIndex < tupleCount0; ++tIndex) {

+		    entry = tpc1.partition(accessor, tIndex, tableSize);

+		    

+            foundGroup = false;

+            offset = 0;

+            do {

+                hashTable.getTuplePointer(entry, offset++, storedTuplePointer);

+                if (storedTuplePointer.frameIndex < 0)

+                    break;

+                storedKeysAccessor.reset(buffers.get(storedTuplePointer.frameIndex));

+

+			    int c = ftpc1.compare(storedKeysAccessor, storedTuplePointer.tupleIndex, accessor, tIndex);

+                if (c == 0) {

+                    foundGroup = true;

+                    break;

+                }

+            } while (true);

+

+            if (foundGroup) {

+//                System.out.print("\nITuple : " + entry);

+//                getField(accessor, tIndex, storedKeys[0]);

+            	stateTuplePointer = getAggregateIndex(storedKeysAccessor, storedTuplePointer.tupleIndex);

+            	

+            	if(stateTuplePointer.frameIndex < 0) {

+            		outputTupleBuilder.reset();

+

+                    for (int k = 0; k < keys.length; k++) {

+                    	outputTupleBuilder.addField(storedKeysAccessor, storedTuplePointer.tupleIndex, storedKeys[k]);

+                    }

+

+                	aggregator.init(outputTupleBuilder, accessor, tIndex, state);

+

+    				if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),

+    						outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {

+    					addNewBuffer(true);

+        				if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),

+        						outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {

+    						throw new HyracksDataException("Cannot write the state into a frame.");

+    					}

+    				}

+

+    				stateTuplePointer.frameIndex = lastStateBIndex;

+    				stateTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;

+                	updateAggregateIndex(storedKeysAccessor, storedTuplePointer, stateTuplePointer);

+            	}

+            	else {

+            		stateAccessor.reset(stateBuffers.get(stateTuplePointer.frameIndex));

+                    aggregator.aggregate(accessor, tIndex, stateAccessor, stateTuplePointer.tupleIndex, state);

+//                    stateAccessor.prettyPrint();

+            	}

+            }

+        }

+	}

+	

+	public void write(IFrameWriter writer) throws HyracksDataException {

+		ByteBuffer buffer = ctx.allocateFrame();

+        ByteBuffer tmpBuffer = ctx.allocateFrame();

+		appender.reset(buffer, true);

+		TuplePointer stateTuplePointer = new TuplePointer();

+		int currentStateBuffer = -1;

+		boolean emitTuple;

+		

+		

+		for (int i = 0; i < buffers.size(); ++i) {

+			storedKeysAccessor.reset(buffers.get(i));

+

+			for (int tIndex = 0; tIndex < storedKeysAccessor.getTupleCount(); ++tIndex) {

+				stateTuplePointer = getAggregateIndex(storedKeysAccessor, tIndex);

+				outputTupleBuilder.reset();

+				emitTuple = false;

+

+				if(stateTuplePointer.frameIndex > -1) {

+					emitTuple = true;

+					for (int j = 0; j < keysAndDecors.length; j++) {

+						outputTupleBuilder.addField(storedKeysAccessor, tIndex, keysAndDecors[j]);

+					}

+					if (currentStateBuffer != stateTuplePointer.frameIndex) {

+						stateAccessor.reset(stateBuffers.get(stateTuplePointer.frameIndex));

+						currentStateBuffer = stateTuplePointer.frameIndex;

+					}

+					aggregator.outputFinalResult(outputTupleBuilder, stateAccessor, stateTuplePointer.tupleIndex, state);

+				}

+				else if(isLeftOuter) {

+					emitTuple=true;

+/*					for (int k = 0; k < nullWriters.length; k++) {

+			            nullWriters[k].writeNull(outputTupleBuilder.getDataOutput());

+			            outputTupleBuilder.addFieldEndOffset();

+					}

+*/			    	

+//					tmpBuffer.clear();

+					state = aggregator.createAggregateStates();

+                	stateAppender.reset(tmpBuffer, true);

+                	

+            		outputTupleBuilder.reset();

+

+                    for (int k = 0; k < storedKeys.length; k++) {

+                    	outputTupleBuilder.addField(storedKeysAccessor, tIndex, storedKeys[k]);

+                    }

+

+                	aggregator.init(outputTupleBuilder, null, 0, state);

+

+    				if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),

+    						outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize()))

+						throw new HyracksDataException("Cannot write the state into a frame.");

+

+            		outputTupleBuilder.reset();

+

+					for (int j = 0; j < keysAndDecors.length; j++) {

+						outputTupleBuilder.addField(storedKeysAccessor, tIndex, keysAndDecors[j]);

+					}

+					

+					stateAccessor.reset(tmpBuffer);

+					currentStateBuffer = -1;

+

+					aggregator.outputFinalResult(outputTupleBuilder, stateAccessor, 0, state);

+				}

+				

+				if (emitTuple) {

+					if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),

+							outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {

+						writer.nextFrame(buffer);

+						appender.reset(buffer, true);

+						if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),

+								outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {

+							throw new HyracksDataException("Cannot write groupjoin output into a frame.");

+						}

+					}

+				}

+			}

+		}

+

+		if (appender.getTupleCount() != 0) {

+			writer.nextFrame(buffer);

+		}

+	}

+	

+    public void close() throws HyracksDataException {

+    	buffers.clear();

+    	stateBuffers.clear();

+    	state = null;

+    }

+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java
new file mode 100644
index 0000000..60abc12
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java
@@ -0,0 +1,551 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class HybridHashGroupJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
+
+    private final int memsize;
+    private static final long serialVersionUID = 1L;
+    private final int inputsize0;
+    private final double factor;
+    private final int recordsPerFrame;
+    private final int[] keys0;
+    private final int[] keys1;
+    private final int[] projectFields;
+    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final IBinaryComparatorFactory[] joinComparatorFactories;
+    private final IBinaryComparatorFactory[] groupComparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
+
+    /**
+     * @param spec
+     * @param memsize
+     *            in frames
+     * @param inputsize0
+     *            in frames
+     * @param recordsPerFrame
+     * @param factor
+     * @param keys0
+     * @param keys1
+     * @param projectFields TODO
+     * @param hashFunctionFactories
+     * @param joinComparatorFactories
+     * @param groupComparatorFactories
+     * @param aggregatorFactory
+     * @param recordDescriptor
+     * @param isLeftOuter TODO
+     * @throws HyracksDataException
+     */
+
+    public HybridHashGroupJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, int[] projectFields,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] joinComparatorFactories,
+            IBinaryComparatorFactory[] groupComparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.factor = factor;
+        this.recordsPerFrame = recordsPerFrame;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.projectFields = projectFields;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.joinComparatorFactories = joinComparatorFactories;
+        this.groupComparatorFactories = groupComparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+        ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p1Aid, p2Aid);
+
+        builder.addActivity(this, phase1);
+        builder.addSourceEdge(0, phase1, 0);
+
+        builder.addActivity(this, phase2);
+        builder.addSourceEdge(1, phase2, 0);
+
+        builder.addBlockingEdge(phase1, phase2);
+
+        builder.addTargetEdge(0, phase2, 0);
+    }
+
+    public static class BuildAndPartitionTaskState extends AbstractStateObject {
+        private RunFileWriter[] fWriters;
+        private InMemoryHashGroupJoin joiner;
+        private int nPartitions;
+        private int memoryForHashtable;
+
+        public BuildAndPartitionTaskState() {
+        }
+
+        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+
+    }
+
+    private class BuildAndPartitionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId p2Aid;
+        
+        public BuildAndPartitionActivityNode(ActivityId p1Aid, ActivityId p2Aid) {
+            super(p1Aid);
+            this.p2Aid = p2Aid;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(p2Aid, 0);
+            final IBinaryComparator[] comparators = new IBinaryComparator[groupComparatorFactories.length];
+            for (int i = 0; i < groupComparatorFactories.length; ++i) {
+                comparators[i] = groupComparatorFactories[i].createBinaryComparator();
+            }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if(isLeftOuter){
+	                for (int i = 0; i < nullWriterFactories1.length; i++) {
+	                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+	            }
+            }
+
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+                        .getJobId(), new TaskId(getActivityId(), partition));
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys0,
+                        hashFunctionFactories).createPartitioner();
+                private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
+                private ByteBuffer[] bufferForPartitions;
+                private final ByteBuffer inBuffer = ctx.allocateFrame();
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (state.memoryForHashtable != 0)
+                        build(inBuffer);
+
+                    for (int i = 0; i < state.nPartitions; i++) {
+                        ByteBuffer buf = bufferForPartitions[i];
+                        accessorBuild.reset(buf);
+                        if (accessorBuild.getTupleCount() > 0) {
+                            write(i, buf);
+                        }
+                        closeWriter(i);
+                    }
+
+                    ctx.setStateObject(state);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+                    if (state.memoryForHashtable != memsize - 2) {
+                        accessorBuild.reset(buffer);
+                        int tCount = accessorBuild.getTupleCount();
+                        for (int i = 0; i < tCount; ++i) {
+                            int entry = -1;
+                            if (state.memoryForHashtable == 0) {
+                                entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
+                                boolean newBuffer = false;
+                                ByteBuffer bufBi = bufferForPartitions[entry];
+                                while (true) {
+                                    appender.reset(bufBi, newBuffer);
+                                    if (appender.append(accessorBuild, i)) {
+                                        break;
+                                    } else {
+                                        write(entry, bufBi);
+                                        bufBi.clear();
+                                        newBuffer = true;
+                                    }
+                                }
+                            } else {
+                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
+                                if (entry < state.memoryForHashtable) {
+                                    while (true) {
+                                        if (!ftappender.append(accessorBuild, i)) {
+                                            build(inBuffer);
+
+                                            ftappender.reset(inBuffer, true);
+                                        } else {
+                                            break;
+                                        }
+                                    }
+                                } else {
+                                    entry %= state.nPartitions;
+                                    boolean newBuffer = false;
+                                    ByteBuffer bufBi = bufferForPartitions[entry];
+                                    while (true) {
+                                        appender.reset(bufBi, newBuffer);
+                                        if (appender.append(accessorBuild, i)) {
+                                            break;
+                                        } else {
+                                            write(entry, bufBi);
+                                            bufBi.clear();
+                                            newBuffer = true;
+                                        }
+                                    }
+                                }
+                            }
+
+                        }
+                    } else {
+                        build(buffer);
+                    }
+
+                }
+
+                private void build(ByteBuffer inBuffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(inBuffer, copyBuffer);
+                    try{
+                    	state.joiner.build(copyBuffer);
+                    } catch (IOException e){
+                    	
+                    }
+                }
+
+                @Override
+                public void open() throws HyracksDataException {
+                    if (memsize > 1) {
+                        if (memsize > inputsize0) {
+                            state.nPartitions = 0;
+                        } else {
+                            state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
+                                    / (double) (memsize - 1)));
+                        }
+                        if (state.nPartitions <= 0) {
+                            // becomes in-memory HJ
+                            state.memoryForHashtable = memsize - 2;
+                            state.nPartitions = 0;
+                        } else {
+                            state.memoryForHashtable = memsize - state.nPartitions - 2;
+                            if (state.memoryForHashtable < 0) {
+                                state.memoryForHashtable = 0;
+                                state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+                            }
+                        }
+                    } else {
+                        throw new HyracksDataException("not enough memory");
+                    }
+
+                    ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
+                    ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
+                    int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+                    state.joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0, hpcf1, rd0, recordDescriptors[0],
+                            aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                    state.fWriters = new RunFileWriter[state.nPartitions];
+                    for (int i = 0; i < state.nPartitions; i++) {
+                        bufferForPartitions[i] = ctx.allocateFrame();
+                    }
+
+                    ftappender.reset(inBuffer, true);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+
+                private void closeWriter(int i) throws HyracksDataException {
+                    RunFileWriter writer = state.fWriters[i];
+                    if (writer != null) {
+                        writer.close();
+                    }
+                }
+
+                private void write(int i, ByteBuffer head) throws HyracksDataException {
+                    RunFileWriter writer = state.fWriters[i];
+                    if (writer == null) {
+                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                                BuildAndPartitionActivityNode.class.getSimpleName());
+                        writer = new RunFileWriter(file, ctx.getIOManager());
+                        writer.open();
+                        state.fWriters[i] = writer;
+                    }
+                    writer.nextFrame(head);
+                }
+            };
+            return op;
+        }
+    }
+
+    private class PartitionAndJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId p1Aid;
+        
+        public PartitionAndJoinActivityNode(ActivityId p1Aid, ActivityId p2Aid) {
+            super(p2Aid);
+            this.p1Aid = p1Aid;
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(p1Aid, 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final IBinaryComparator[] comparators = new IBinaryComparator[groupComparatorFactories.length];
+            for (int i = 0; i < groupComparatorFactories.length; ++i) {
+                comparators[i] = groupComparatorFactories[i].createBinaryComparator();
+            }
+            final INullWriter[] nullWriters1 = new INullWriter[nullWriterFactories1.length];
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private BuildAndPartitionTaskState state;
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+                        hashFunctionFactories);
+                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories);
+                private final ITuplePartitionComputer hpcProbe = hpcf1.createPartitioner();
+
+                private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+                private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
+                private final ByteBuffer inBuffer = ctx.allocateFrame();
+                private final ByteBuffer outBuffer = ctx.allocateFrame();
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
+                private ByteBuffer[] bufferForPartitions;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
+                    writer.open();
+                    buildWriters = state.fWriters;
+                    probeWriters = new RunFileWriter[state.nPartitions];
+                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                    for (int i = 0; i < state.nPartitions; i++) {
+                        bufferForPartitions[i] = ctx.allocateFrame();
+                    }
+                    appender.reset(outBuffer, true);
+                    ftap.reset(inBuffer, true);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    if (state.memoryForHashtable != memsize - 2) {
+                        accessorProbe.reset(buffer);
+                        int tupleCount0 = accessorProbe.getTupleCount();
+                        for (int i = 0; i < tupleCount0; ++i) {
+
+                            int entry = -1;
+                            if (state.memoryForHashtable == 0) {
+                                entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
+                                boolean newBuffer = false;
+                                ByteBuffer outbuf = bufferForPartitions[entry];
+                                while (true) {
+                                    appender.reset(outbuf, newBuffer);
+                                    if (appender.append(accessorProbe, i)) {
+                                        break;
+                                    } else {
+                                        write(entry, outbuf);
+                                        outbuf.clear();
+                                        newBuffer = true;
+                                    }
+                                }
+                            } else {
+                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
+                                if (entry < state.memoryForHashtable) {
+                                    while (true) {
+                                        if (!ftap.append(accessorProbe, i)) {
+                                            state.joiner.join(inBuffer, writer);
+                                            ftap.reset(inBuffer, true);
+                                        } else
+                                            break;
+                                    }
+
+                                } else {
+                                    entry %= state.nPartitions;
+                                    boolean newBuffer = false;
+                                    ByteBuffer outbuf = bufferForPartitions[entry];
+                                    while (true) {
+                                        appender.reset(outbuf, newBuffer);
+                                        if (appender.append(accessorProbe, i)) {
+                                            break;
+                                        } else {
+                                            write(entry, outbuf);
+                                            outbuf.clear();
+                                            newBuffer = true;
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    } else {
+                        state.joiner.join(buffer, writer);
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    state.joiner.join(inBuffer, writer);
+                    state.joiner.write(writer);
+                    state.joiner.closeJoin(writer);
+                    
+                    try{
+                    if (state.memoryForHashtable != memsize - 2) {
+                        for (int i = 0; i < state.nPartitions; i++) {
+                            ByteBuffer buf = bufferForPartitions[i];
+                            accessorProbe.reset(buf);
+                            if (accessorProbe.getTupleCount() > 0) {
+                                write(i, buf);
+                            }
+                            closeWriter(i);
+                        }
+
+                        inBuffer.clear();
+                        int tableSize = -1;
+                        if (state.memoryForHashtable == 0) {
+                            tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
+                        } else {
+                            tableSize = (int) (memsize * recordsPerFrame * factor);
+                        }
+                        for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if (buildWriter == null) {
+                                continue;
+                            }
+                            InMemoryHashGroupJoin joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                                    new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0, hpcf1, rd0, recordDescriptors[0],
+                                    aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+
+                            RunFileReader buildReader = buildWriter.createReader();
+                            buildReader.open();
+                            while (buildReader.nextFrame(inBuffer)) {
+                            	ByteBuffer copyBuffer = ctx.allocateFrame();
+                            	FrameUtils.copy(inBuffer, copyBuffer);
+                            	joiner.build(copyBuffer);
+                            	inBuffer.clear();
+                            }
+                            buildReader.close();
+
+                            // probe
+                            if (probeWriter != null) {
+	                            RunFileReader probeReader = probeWriter.createReader();
+	                            probeReader.open();
+	                            while (probeReader.nextFrame(inBuffer)) {
+	                                joiner.join(inBuffer, writer);
+	                                inBuffer.clear();
+	                            }
+	                            probeReader.close();
+                            }
+                            joiner.write(writer);
+                            joiner.closeJoin(writer);
+                        }
+                    }
+                    }
+                    catch(Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    }
+                    finally {
+                    	writer.close();
+                    }
+                }
+
+                private void closeWriter(int i) throws HyracksDataException {
+                    RunFileWriter writer = probeWriters[i];
+                    if (writer != null) {
+                        writer.close();
+                    }
+                }
+
+                private void write(int i, ByteBuffer head) throws HyracksDataException {
+                    RunFileWriter writer = probeWriters[i];
+                    if (writer == null) {
+                        FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
+                                .getSimpleName());
+                        writer = new RunFileWriter(file, ctx.getIOManager());
+                        writer.open();
+                        probeWriters[i] = writer;
+                    }
+                    writer.nextFrame(head);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
+            };
+            return op;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java
new file mode 100644
index 0000000..ff64600
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class InMemoryHashGroupJoin {
+    private final List<ByteBuffer> buffers;
+    private final FrameTupleAccessor accessorBuild;
+    private final FrameTupleAccessor accessorProbe;
+    private final FrameTupleAppender appender;
+    private final IBinaryComparatorFactory[] comparator;
+    private final ByteBuffer outBuffer;
+	private final GroupJoinHelper gByTable;
+
+    public InMemoryHashGroupJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+            FrameTupleAccessor accessor1, IBinaryComparatorFactory[] groupComparator, ITuplePartitionComputerFactory gByTpc0,
+            ITuplePartitionComputerFactory gByTpc1, RecordDescriptor gByInRecordDescriptor, RecordDescriptor gByOutRecordDescriptor,
+            IAggregatorDescriptorFactory aggregatorFactory, int[] joinAttributes, int[] groupAttributes, 
+            int[] decorAttributes, boolean isLeftOuter, INullWriter[] nullWriters1) throws HyracksDataException {
+        buffers = new ArrayList<ByteBuffer>();
+        this.accessorBuild = accessor0;
+        this.accessorProbe = accessor1;
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.comparator = groupComparator;
+        
+        outBuffer = ctx.allocateFrame();
+        appender.reset(outBuffer, true);
+
+        gByTable = new GroupJoinHelper(ctx, groupAttributes, joinAttributes, decorAttributes, comparator, gByTpc0, 
+        		gByTpc1, aggregatorFactory, gByInRecordDescriptor, gByOutRecordDescriptor, isLeftOuter, nullWriters1, tableSize);
+    }
+
+    public void build(ByteBuffer buffer) throws HyracksDataException, IOException {
+        buffers.add(buffer);
+        gByTable.build(accessorBuild, buffer);
+    }
+
+    public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        gByTable.insert(accessorProbe, buffer);
+    }
+
+    public void write(IFrameWriter writer) throws HyracksDataException {
+    	gByTable.write(writer);
+    }
+    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            flushFrame(outBuffer, writer);
+        }
+        gByTable.close();
+    }
+
+    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java
new file mode 100644
index 0000000..d03af17
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class InMemoryHashGroupJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final int BUILD_ACTIVITY_ID = 0;
+    private static final int PROBE_ACTIVITY_ID = 1;
+    private static final int OUTPUT_ACTIVITY_ID = 2;
+
+    private static final long serialVersionUID = 1L;
+    private final int[] keys0;
+    private final int[] keys1;
+    private final int[] projectFields;
+    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final IBinaryComparatorFactory[] joinComparatorFactories;
+    private final IBinaryComparatorFactory[] groupComparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
+    private final int tableSize;
+
+    public InMemoryHashGroupJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+    		int[] projectFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+    		IBinaryComparatorFactory[] joinComparatorFactories, IBinaryComparatorFactory[] groupComparatorFactories,
+    		IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+    		INullWriterFactory[] nullWriterFactories1, int tableSize) {
+        super(spec, 2, 1);
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.projectFields = projectFields;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.joinComparatorFactories = joinComparatorFactories;
+        this.groupComparatorFactories = groupComparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        this.tableSize = tableSize;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ActivityId hbaId = new ActivityId(odId, BUILD_ACTIVITY_ID);
+        ActivityId hpaId = new ActivityId(odId, PROBE_ACTIVITY_ID);
+        ActivityId oaId = new ActivityId(odId, OUTPUT_ACTIVITY_ID);
+        HashBuildActivityNode hba = new HashBuildActivityNode(hbaId, hpaId);
+        HashProbeActivityNode hpa = new HashProbeActivityNode(hbaId, hpaId);
+        OutputActivity oa = new OutputActivity(oaId);
+
+        builder.addActivity(this, hba);
+        builder.addSourceEdge(0, hba, 0);
+
+        builder.addActivity(this, hpa);
+        builder.addSourceEdge(1, hpa, 0);
+
+        builder.addActivity(this, oa);
+        builder.addTargetEdge(0, oa, 0);
+
+        builder.addBlockingEdge(hba, hpa);
+        builder.addBlockingEdge(hpa, oa);
+
+    }
+
+    public static class HashBuildTaskState extends AbstractStateObject {
+        private InMemoryHashGroupJoin joiner;
+
+        public HashBuildTaskState() {
+        }
+
+        private HashBuildTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
+    private class HashBuildActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId hpaId;
+        
+        public HashBuildActivityNode(ActivityId hbaId, ActivityId hpaId) {
+            super(hbaId);
+            this.hpaId = hpaId;
+        }
+        
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
+
+            final IBinaryComparator[] comparators = new IBinaryComparator[joinComparatorFactories.length];
+            for (int i = 0; i < joinComparatorFactories.length; ++i) {
+                comparators[i] = joinComparatorFactories[i].createBinaryComparator();
+            }
+            
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if(isLeftOuter){
+	                for (int i = 0; i < nullWriterFactories1.length; i++) {
+	                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+	            }
+            }
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private HashBuildTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
+                    ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
+                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+
+                    state.joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0 /*gByTpc0*/, hpcf1 /*gByTpc1*/, rd0,
+                            recordDescriptors[0], aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(buffer, copyBuffer);
+                    try{
+                    	state.joiner.build(copyBuffer);
+                    } catch (IOException e){
+                    	
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    ctx.setStateObject(state);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+            };
+            return op;
+        }
+
+    }
+
+    private class HashProbeActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        private final ActivityId hbaId;
+        
+        public HashProbeActivityNode(ActivityId hbaId, ActivityId hpaId) {
+            super(hpaId);
+            this.hbaId = hbaId;
+        }
+        
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private HashBuildTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+                            BUILD_ACTIVITY_ID), partition));
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                	// Probe activity joiner
+                    state.joiner.join(buffer, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    ctx.setStateObject(state);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    throw new HyracksDataException("InMemoryHashGroupJoinOperator has failed.");
+                }
+                
+            };
+            return op;
+        }
+
+	}
+
+    private class OutputActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public OutputActivity(ActivityId id) {
+            super(id);
+        }
+        
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+                @Override
+                public void initialize() throws HyracksDataException {
+                    HashBuildTaskState buildState = (HashBuildTaskState) ctx.getStateObject(new TaskId(
+                            new ActivityId(getOperatorId(), BUILD_ACTIVITY_ID), partition));
+                    InMemoryHashGroupJoin table = buildState.joiner;
+                    writer.open();
+                    try {
+                        table.write(writer);
+                    } catch (Exception e) {
+                        writer.fail();
+                        throw new HyracksDataException(e);
+                    } finally {
+                    	table.closeJoin(writer);
+                        writer.close();
+                    }
+
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java
new file mode 100644
index 0000000..9e0a265
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java
@@ -0,0 +1,447 @@
+/*

+ * Copyright 2009-2010 by The Regents of the University of California

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * you may obtain a copy of the License from

+ * 

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package edu.uci.ics.hyracks.tests.integration;

+

+import java.io.DataOutput;

+import java.io.File;

+import java.io.IOException;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+import org.junit.Test;

+

+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;

+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.io.FileReference;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;

+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;

+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;

+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;

+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;

+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashGroupJoinOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashGroupJoinOperatorDescriptor;

+

+public class TPCHCustomerOrderGroupJoinTest extends AbstractIntegrationTest {

+    private static final Logger LOGGER = Logger.getLogger(TPCHCustomerOrderGroupJoinTest.class.getName());

+    

+    private static class NoopNullWriterFactory implements INullWriterFactory {

+

+        private static final long serialVersionUID = 1L;

+        public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();

+

+        private NoopNullWriterFactory() {

+        }

+

+        @Override

+        public INullWriter createNullWriter() {

+            return new INullWriter() {

+                @Override

+                public void writeNull(DataOutput out) throws HyracksDataException {

+                    try {

+                        out.writeInt(0);

+                    } catch (IOException e) {

+                        throw new HyracksDataException(e);

+                    }

+                }

+            };

+        }

+    }

+

+    /*

+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,

+     * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY

+     * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT

+     * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );

+     * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,

+     * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE

+     * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY

+     * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT

+     * NULL, O_COMMENT VARCHAR(79) NOT NULL );

+     */

+

+    @Test

+    public void customerOrderInMemoryGroupJoin() throws Exception {

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderInMemoryGroupJoin start");

+    	}

+    	long runTime = System.currentTimeMillis();

+        JobSpecification spec = new JobSpecification();

+

+        String custTable = "data/tpch0.001/customer.tbl";

+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(

+                custTable))) };

+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);

+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {

+                		IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        }, '|'), custDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);

+

+        String ordTable = "data/tpch0.001/orders.tbl";

+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(

+                ordTable))) };

+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);

+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { 

+                		IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);

+

+        RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+        		IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });

+        

+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];

+        for (int j = 0; j < nullWriterFactories.length; j++) {

+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;

+        }

+        

+        InMemoryHashGroupJoinOperatorDescriptor groupJoin = new InMemoryHashGroupJoinOperatorDescriptor(

+                spec,

+                new int[] { 0 },

+                new int[] { 1 },

+                new int[] { },

+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+                custOrderGroupJoinDesc,

+                true,

+                nullWriterFactories,

+                128);

+        

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);

+

+        ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(

+                spec,

+                new int[] { 1 },

+                4,

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+                        .of(IntegerPointable.FACTORY) },

+                new IntegerNormalizedKeyComputerFactory(),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),

+                custOrderGroupJoinDesc,

+                new HashSpillableTableFactory(

+                        new FieldHashPartitionComputerFactory(

+                        		new int[] { 1 },

+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                        .of(IntegerPointable.FACTORY) }),

+                        128), true);

+

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);

+

+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,

+                createTempFile().getAbsolutePath()) });

+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+

+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);

+

+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);

+

+        IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);

+

+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);

+

+        spec.addRoot(printer);

+        runTest(spec);

+        

+    	runTime = System.currentTimeMillis() - runTime;

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderInMemoryGroupJoin stop. run time: " + runTime);

+    	}

+    }

+

+    @Test

+    public void customerOrderGraceHashGroupJoin() throws Exception {

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderGraceHashGroupJoin start");

+    	}

+    	long runTime = System.currentTimeMillis();

+        JobSpecification spec = new JobSpecification();

+

+        String custTable = "data/tpch0.001/customer.tbl";

+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(

+                custTable))) };

+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);

+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {

+                		IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        }, '|'), custDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);

+

+        String ordTable = "data/tpch0.001/orders.tbl";

+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(

+                ordTable))) };

+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);

+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { 

+                		IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);

+

+        RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+        		IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });

+        

+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];

+        for (int j = 0; j < nullWriterFactories.length; j++) {

+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;

+        }        

+        

+        HybridHashGroupJoinOperatorDescriptor groupJoin = new HybridHashGroupJoinOperatorDescriptor(

+//        GraceHashGroupJoinOperatorDescriptor groupJoin = new GraceHashGroupJoinOperatorDescriptor(

+                spec,

+                5,

+                128,

+                200,

+                1.2,

+                new int[] { 0 },

+                new int[] { 1 },

+                new int[]{ },

+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+                custOrderGroupJoinDesc, true, nullWriterFactories);

+        

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);

+

+        ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(

+                spec,

+                new int[] { 1 },

+                4,

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+                        .of(IntegerPointable.FACTORY) },

+                new IntegerNormalizedKeyComputerFactory(),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),

+                custOrderGroupJoinDesc,

+                new HashSpillableTableFactory(

+                        new FieldHashPartitionComputerFactory(

+                        		new int[] { 1 },

+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                        .of(IntegerPointable.FACTORY) }),

+                        128), true);

+

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);

+

+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,

+                createTempFile().getAbsolutePath()) });

+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+

+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);

+

+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);

+

+        IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);

+

+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);

+

+        spec.addRoot(printer);

+        runTest(spec);

+        

+    	runTime = System.currentTimeMillis() - runTime;

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderGraceHashGroupJoin stop. run time: " + runTime);

+    	}

+    }

+

+    @Test

+    public void customerOrderHybridHashGroupJoin() throws Exception {

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderHybridHashGroupJoin start");

+    	}

+    	long runTime = System.currentTimeMillis();

+        JobSpecification spec = new JobSpecification();

+

+        String custTable = "data/tpch0.001/customer.tbl";

+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(

+                custTable))) };

+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);

+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {

+                		IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        }, '|'), custDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);

+

+        String ordTable = "data/tpch0.001/orders.tbl";

+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(

+                ordTable))) };

+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);

+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,

+                UTF8StringSerializerDeserializer.INSTANCE });

+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,

+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { 

+                		IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,

+                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);

+

+        RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {

+        		IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });

+        

+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];

+        for (int j = 0; j < nullWriterFactories.length; j++) {

+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;

+        }        

+        

+        HybridHashGroupJoinOperatorDescriptor groupJoin = new HybridHashGroupJoinOperatorDescriptor(

+                spec,

+                5,

+                128,

+                200,

+                1.2,

+                new int[] { 0 },

+                new int[] { 1 },

+                new int[]{ },

+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },

+                new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+                custOrderGroupJoinDesc, true, nullWriterFactories);

+        

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);

+

+        ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(

+                spec,

+                new int[] { 1 },

+                4,

+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+                        .of(IntegerPointable.FACTORY) },

+                new IntegerNormalizedKeyComputerFactory(),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),

+		        new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),

+                custOrderGroupJoinDesc,

+                new HashSpillableTableFactory(

+                        new FieldHashPartitionComputerFactory(

+                        		new int[] { 1 },

+                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                        .of(IntegerPointable.FACTORY) }),

+                        128), true);

+

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);

+

+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,

+                createTempFile().getAbsolutePath()) });

+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+

+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);

+

+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);

+

+        IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);

+

+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);

+

+        spec.addRoot(printer);

+        runTest(spec);

+        

+    	runTime = System.currentTimeMillis() - runTime;

+    	if (LOGGER.isLoggable(Level.INFO)) {

+    	    LOGGER.info("*customerOrderHybridHashGroupJoin stop. run time: " + runTime);

+    	}

+    }

+}
\ No newline at end of file