fix the binary hash function family issue in hivesterix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2759 123451ca-8445-de46-9d55-352943316053
diff --git a/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java b/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
index 0485380..1fb973e 100644
--- a/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
+++ b/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
@@ -80,770 +80,729 @@
 @SuppressWarnings("rawtypes")

 public class HiveAlgebricksTranslator implements Translator {

 

-	private int currentVariable = 0;

+    private int currentVariable = 0;

 

-	private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();

+    private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();

 

-	private boolean continueTraverse = true;

+    private boolean continueTraverse = true;

 

-	private IMetadataProvider<PartitionDesc, Object> metaData;

+    private IMetadataProvider<PartitionDesc, Object> metaData;

 

-	/**

-	 * map variable name to the logical variable

-	 */

-	private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();

+    /**

+     * map variable name to the logical variable

+     */

+    private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();

 

-	/**

-	 * map field name to LogicalVariable

-	 */

-	private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();

+    /**

+     * map field name to LogicalVariable

+     */

+    private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();

 

-	/**

-	 * map logical variable to name

-	 */

-	private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();

+    /**

+     * map logical variable to name

+     */

+    private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();

 

-	/**

-	 * asterix root operators

-	 */

-	private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();

+    /**

+     * asterix root operators

+     */

+    private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();

 

-	/**

-	 * a list of visitors

-	 */

-	private List<Visitor> visitors = new ArrayList<Visitor>();

+    /**

+     * a list of visitors

+     */

+    private List<Visitor> visitors = new ArrayList<Visitor>();

 

-	/**

-	 * output writer to print things out

-	 */

-	private static PrintWriter outputWriter = new PrintWriter(

-			new OutputStreamWriter(System.out));

+    /**

+     * output writer to print things out

+     */

+    private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));

 

-	/**

-	 * map a logical variable to type info

-	 */

-	private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();

+    /**

+     * map a logical variable to type info

+     */

+    private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();

 

-	@Override

-	public LogicalVariable getVariable(String fieldName, TypeInfo type) {

-		LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

-		if (var == null) {

-			currentVariable++;

-			var = new LogicalVariable(currentVariable);

-			fieldToLogicalVariableMap.put(fieldName, var);

-			nameToLogicalVariableMap.put(var.toString(), var);

-			variableToType.put(var, type);

-			logicalVariableToFieldMap.put(var, fieldName);

-		}

-		return var;

-	}

+    @Override

+    public LogicalVariable getVariable(String fieldName, TypeInfo type) {

+        LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

+        if (var == null) {

+            currentVariable++;

+            var = new LogicalVariable(currentVariable);

+            fieldToLogicalVariableMap.put(fieldName, var);

+            nameToLogicalVariableMap.put(var.toString(), var);

+            variableToType.put(var, type);

+            logicalVariableToFieldMap.put(var, fieldName);

+        }

+        return var;

+    }

 

-	@Override

-	public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {

-		currentVariable++;

-		LogicalVariable var = new LogicalVariable(currentVariable);

-		fieldToLogicalVariableMap.put(fieldName, var);

-		nameToLogicalVariableMap.put(var.toString(), var);

-		variableToType.put(var, type);

-		logicalVariableToFieldMap.put(var, fieldName);

-		return var;

-	}

+    @Override

+    public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {

+        currentVariable++;

+        LogicalVariable var = new LogicalVariable(currentVariable);

+        fieldToLogicalVariableMap.put(fieldName, var);

+        nameToLogicalVariableMap.put(var.toString(), var);

+        variableToType.put(var, type);

+        logicalVariableToFieldMap.put(var, fieldName);

+        return var;

+    }

 

-	@Override

-	public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {

-		String name = this.logicalVariableToFieldMap.get(oldVar);

-		if (name != null) {

-			fieldToLogicalVariableMap.put(name, newVar);

-			nameToLogicalVariableMap.put(newVar.toString(), newVar);

-			nameToLogicalVariableMap.put(oldVar.toString(), newVar);

-			logicalVariableToFieldMap.put(newVar, name);

-		}

-	}

+    @Override

+    public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {

+        String name = this.logicalVariableToFieldMap.get(oldVar);

+        if (name != null) {

+            fieldToLogicalVariableMap.put(name, newVar);

+            nameToLogicalVariableMap.put(newVar.toString(), newVar);

+            nameToLogicalVariableMap.put(oldVar.toString(), newVar);

+            logicalVariableToFieldMap.put(newVar, name);

+        }

+    }

 

-	@Override

-	public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {

-		return metaData;

-	}

+    @Override

+    public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {

+        return metaData;

+    }

 

-	/**

-	 * only get an variable, without rewriting it

-	 * 

-	 * @param fieldName

-	 * @return

-	 */

-	private LogicalVariable getVariableOnly(String fieldName) {

-		return fieldToLogicalVariableMap.get(fieldName);

-	}

+    /**

+     * only get an variable, without rewriting it

+     * 

+     * @param fieldName

+     * @return

+     */

+    private LogicalVariable getVariableOnly(String fieldName) {

+        return fieldToLogicalVariableMap.get(fieldName);

+    }

 

-	private void updateVariable(String fieldName, LogicalVariable variable) {

-		LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

-		if (var == null) {

-			fieldToLogicalVariableMap.put(fieldName, variable);

-			nameToLogicalVariableMap.put(fieldName, variable);

-		} else if (!var.equals(variable)) {

-			// System.out.println("!!!replace variables!!!");

-			fieldToLogicalVariableMap.put(fieldName, variable);

-			nameToLogicalVariableMap.put(fieldName, variable);

-		}

-	}

+    private void updateVariable(String fieldName, LogicalVariable variable) {

+        LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);

+        if (var == null) {

+            fieldToLogicalVariableMap.put(fieldName, variable);

+            nameToLogicalVariableMap.put(fieldName, variable);

+        } else if (!var.equals(variable)) {

+            // System.out.println("!!!replace variables!!!");

+            fieldToLogicalVariableMap.put(fieldName, variable);

+            nameToLogicalVariableMap.put(fieldName, variable);

+        }

+    }

 

-	/**

-	 * get a list of logical variables from the schema

-	 * 

-	 * @param schema

-	 * @return

-	 */

-	@Override

-	public List<LogicalVariable> getVariablesFromSchema(Schema schema) {

-		List<LogicalVariable> variables = new ArrayList<LogicalVariable>();

-		List<String> names = schema.getNames();

+    /**

+     * get a list of logical variables from the schema

+     * 

+     * @param schema

+     * @return

+     */

+    @Override

+    public List<LogicalVariable> getVariablesFromSchema(Schema schema) {

+        List<LogicalVariable> variables = new ArrayList<LogicalVariable>();

+        List<String> names = schema.getNames();

 

-		for (String name : names)

-			variables.add(nameToLogicalVariableMap.get(name));

-		return variables;

-	}

+        for (String name : names)

+            variables.add(nameToLogicalVariableMap.get(name));

+        return variables;

+    }

 

-	/**

-	 * get variable to typeinfo map

-	 * 

-	 * @return

-	 */

-	public HashMap<LogicalVariable, TypeInfo> getVariableContext() {

-		return this.variableToType;

-	}

+    /**

+     * get variable to typeinfo map

+     * 

+     * @return

+     */

+    public HashMap<LogicalVariable, TypeInfo> getVariableContext() {

+        return this.variableToType;

+    }

 

-	/**

-	 * get the number of variables

-	 * s

-	 * @return

-	 */

-	public int getVariableCounter() {

-		return currentVariable + 1;

-	}

+    /**

+     * get the number of variables

+     * s

+     * 

+     * @return

+     */

+    public int getVariableCounter() {

+        return currentVariable + 1;

+    }

 

-	/**

-	 * translate from hive operator tree to asterix operator tree

-	 * 

-	 * @param hive

-	 *            roots

-	 * @return Algebricks roots

-	 */

-	public void translate(List<Operator> hiveRoot,

-			ILogicalOperator parentOperator,

-			HashMap<String, PartitionDesc> aliasToPathMap)

-			throws AlgebricksException {

-		/**

-		 * register visitors

-		 */

-		visitors.add(new FilterVisitor());

-		visitors.add(new GroupByVisitor());

-		visitors.add(new JoinVisitor());

-		visitors.add(new LateralViewJoinVisitor());

-		visitors.add(new UnionVisitor());

-		visitors.add(new LimitVisitor());

-		visitors.add(new MapJoinVisitor());

-		visitors.add(new ProjectVisitor());

-		visitors.add(new SortVisitor());

-		visitors.add(new ExtractVisitor());

-		visitors.add(new TableScanWriteVisitor(aliasToPathMap));

+    /**

+     * translate from hive operator tree to asterix operator tree

+     * 

+     * @param hive

+     *            roots

+     * @return Algebricks roots

+     */

+    public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,

+            HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {

+        /**

+         * register visitors

+         */

+        visitors.add(new FilterVisitor());

+        visitors.add(new GroupByVisitor());

+        visitors.add(new JoinVisitor());

+        visitors.add(new LateralViewJoinVisitor());

+        visitors.add(new UnionVisitor());

+        visitors.add(new LimitVisitor());

+        visitors.add(new MapJoinVisitor());

+        visitors.add(new ProjectVisitor());

+        visitors.add(new SortVisitor());

+        visitors.add(new ExtractVisitor());

+        visitors.add(new TableScanWriteVisitor(aliasToPathMap));

 

-		List<Mutable<ILogicalOperator>> refList = translate(hiveRoot,

-				new MutableObject<ILogicalOperator>(parentOperator));

-		insertReplicateOperator(refList);

-		if (refList != null)

-			rootOperators.addAll(refList);

-	}

+        List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(

+                parentOperator));

+        insertReplicateOperator(refList);

+        if (refList != null)

+            rootOperators.addAll(refList);

+    }

 

-	/**

-	 * translate operator DAG

-	 * 

-	 * @param hiveRoot

-	 * @param AlgebricksParentOperator

-	 * @return

-	 */

-	private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,

-			Mutable<ILogicalOperator> AlgebricksParentOperator)

-			throws AlgebricksException {

+    /**

+     * translate operator DAG

+     * 

+     * @param hiveRoot

+     * @param AlgebricksParentOperator

+     * @return

+     */

+    private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,

+            Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {

 

-		for (Operator hiveOperator : hiveRoot) {

-			continueTraverse = true;

-			Mutable<ILogicalOperator> currentOperatorRef = null;

-			if (hiveOperator.getType() == OperatorType.FILTER) {

-				FilterOperator fop = (FilterOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.REDUCESINK) {

-				ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.JOIN) {

-				JoinOperator fop = (JoinOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null) {

-						continueTraverse = true;

-						break;

-					} else

-						continueTraverse = false;

-				}

-				if (currentOperatorRef == null)

-					return null;

-			} else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {

-				LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-				if (currentOperatorRef == null)

-					return null;

-			} else if (hiveOperator.getType() == OperatorType.MAPJOIN) {

-				MapJoinOperator fop = (MapJoinOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null) {

-						continueTraverse = true;

-						break;

-					} else

-						continueTraverse = false;

-				}

-				if (currentOperatorRef == null)

-					return null;

-			} else if (hiveOperator.getType() == OperatorType.SELECT) {

-				SelectOperator fop = (SelectOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.EXTRACT) {

-				ExtractOperator fop = (ExtractOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.GROUPBY) {

-				GroupByOperator fop = (GroupByOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.TABLESCAN) {

-				TableScanOperator fop = (TableScanOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.FILESINK) {

-				FileSinkOperator fop = (FileSinkOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(fop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.LIMIT) {

-				LimitOperator lop = (LimitOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(lop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.UDTF) {

-				UDTFOperator lop = (UDTFOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(lop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null)

-						break;

-				}

-			} else if (hiveOperator.getType() == OperatorType.UNION) {

-				UnionOperator lop = (UnionOperator) hiveOperator;

-				for (Visitor visitor : visitors) {

-					currentOperatorRef = visitor.visit(lop,

-							AlgebricksParentOperator, this);

-					if (currentOperatorRef != null) {

-						continueTraverse = true;

-						break;

-					} else

-						continueTraverse = false;

-				}

-			} else

-				;

-			if (hiveOperator.getChildOperators() != null

-					&& hiveOperator.getChildOperators().size() > 0

-					&& continueTraverse) {

-				@SuppressWarnings("unchecked")

-				List<Operator> children = hiveOperator.getChildOperators();

-				if (currentOperatorRef == null)

-					currentOperatorRef = AlgebricksParentOperator;

-				translate(children, currentOperatorRef);

-			}

-			if (hiveOperator.getChildOperators() == null

-					|| hiveOperator.getChildOperators().size() == 0)

-				logicalOp.add(currentOperatorRef);

-		}

-		return logicalOp;

-	}

+        for (Operator hiveOperator : hiveRoot) {

+            continueTraverse = true;

+            Mutable<ILogicalOperator> currentOperatorRef = null;

+            if (hiveOperator.getType() == OperatorType.FILTER) {

+                FilterOperator fop = (FilterOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {

+                ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.JOIN) {

+                JoinOperator fop = (JoinOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null) {

+                        continueTraverse = true;

+                        break;

+                    } else

+                        continueTraverse = false;

+                }

+                if (currentOperatorRef == null)

+                    return null;

+            } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {

+                LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+                if (currentOperatorRef == null)

+                    return null;

+            } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {

+                MapJoinOperator fop = (MapJoinOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null) {

+                        continueTraverse = true;

+                        break;

+                    } else

+                        continueTraverse = false;

+                }

+                if (currentOperatorRef == null)

+                    return null;

+            } else if (hiveOperator.getType() == OperatorType.SELECT) {

+                SelectOperator fop = (SelectOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.EXTRACT) {

+                ExtractOperator fop = (ExtractOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.GROUPBY) {

+                GroupByOperator fop = (GroupByOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {

+                TableScanOperator fop = (TableScanOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.FILESINK) {

+                FileSinkOperator fop = (FileSinkOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.LIMIT) {

+                LimitOperator lop = (LimitOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.UDTF) {

+                UDTFOperator lop = (UDTFOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null)

+                        break;

+                }

+            } else if (hiveOperator.getType() == OperatorType.UNION) {

+                UnionOperator lop = (UnionOperator) hiveOperator;

+                for (Visitor visitor : visitors) {

+                    currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);

+                    if (currentOperatorRef != null) {

+                        continueTraverse = true;

+                        break;

+                    } else

+                        continueTraverse = false;

+                }

+            } else

+                ;

+            if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0

+                    && continueTraverse) {

+                @SuppressWarnings("unchecked")

+                List<Operator> children = hiveOperator.getChildOperators();

+                if (currentOperatorRef == null)

+                    currentOperatorRef = AlgebricksParentOperator;

+                translate(children, currentOperatorRef);

+            }

+            if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)

+                logicalOp.add(currentOperatorRef);

+        }

+        return logicalOp;

+    }

 

-	/**

-	 * used in select, group by to get no-column-expression columns

-	 * 

-	 * @param cols

-	 * @return

-	 */

-	public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent,

-			List<ExprNodeDesc> cols, ArrayList<LogicalVariable> variables) {

+    /**

+     * used in select, group by to get no-column-expression columns

+     * 

+     * @param cols

+     * @return

+     */

+    public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,

+            ArrayList<LogicalVariable> variables) {

 

-		ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();

+        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();

 

-		/**

-		 * variables to be appended in the assign operator

-		 */

-		ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();

+        /**

+         * variables to be appended in the assign operator

+         */

+        ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();

 

-		// one variable can only be assigned once

-		for (ExprNodeDesc hiveExpr : cols) {

-			rewriteExpression(hiveExpr);

+        // one variable can only be assigned once

+        for (ExprNodeDesc hiveExpr : cols) {

+            rewriteExpression(hiveExpr);

 

-			if (hiveExpr instanceof ExprNodeColumnDesc) {

-				ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;

-				String fieldName = desc2.getTabAlias() + "."

-						+ desc2.getColumn();

+            if (hiveExpr instanceof ExprNodeColumnDesc) {

+                ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;

+                String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();

 

-				// System.out.println("project expr: " + fieldName);

+                // System.out.println("project expr: " + fieldName);

 

-				if (fieldName.indexOf("$$") < 0) {

-					LogicalVariable var = getVariable(fieldName,

-							hiveExpr.getTypeInfo());

-					desc2.setColumn(var.toString());

-					desc2.setTabAlias("");

-					variables.add(var);

-				} else {

-					LogicalVariable var = nameToLogicalVariableMap.get(desc2

-							.getColumn());

-					String name = this.logicalVariableToFieldMap.get(var);

-					var = this.getVariableOnly(name);

-					variables.add(var);

-				}

-			} else {

-				Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);

-				expressions.add(asterixExpr);

-				LogicalVariable var = getVariable(hiveExpr.getExprString()

-						+ asterixExpr.hashCode(), hiveExpr.getTypeInfo());

-				variables.add(var);

-				appendedVariables.add(var);

-			}

-		}

+                if (fieldName.indexOf("$$") < 0) {

+                    LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());

+                    desc2.setColumn(var.toString());

+                    desc2.setTabAlias("");

+                    variables.add(var);

+                } else {

+                    LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());

+                    String name = this.logicalVariableToFieldMap.get(var);

+                    var = this.getVariableOnly(name);

+                    variables.add(var);

+                }

+            } else {

+                Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);

+                expressions.add(asterixExpr);

+                LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),

+                        hiveExpr.getTypeInfo());

+                variables.add(var);

+                appendedVariables.add(var);

+            }

+        }

 

-		/**

-		 * create an assign operator to deal with appending

-		 */

-		ILogicalOperator assignOp = null;

-		if (appendedVariables.size() > 0) {

-			assignOp = new AssignOperator(appendedVariables, expressions);

-			assignOp.getInputs().add(parent);

-		}

-		return assignOp;

-	}

+        /**

+         * create an assign operator to deal with appending

+         */

+        ILogicalOperator assignOp = null;

+        if (appendedVariables.size() > 0) {

+            assignOp = new AssignOperator(appendedVariables, expressions);

+            assignOp.getInputs().add(parent);

+        }

+        return assignOp;

+    }

 

-	private ILogicalPlan plan;

+    private ILogicalPlan plan;

 

-	public ILogicalPlan genLogicalPlan() {

-		plan = new ALogicalPlanImpl(rootOperators);

-		return plan;

-	}

+    public ILogicalPlan genLogicalPlan() {

+        plan = new ALogicalPlanImpl(rootOperators);

+        return plan;

+    }

 

-	public void printOperators() throws AlgebricksException {

-		LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

-		StringBuilder buffer = new StringBuilder();

-		PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

-		outputWriter.println(buffer);

-		outputWriter.println("rewritten variables: ");

-		outputWriter.flush();

-		printVariables();

+    public void printOperators() throws AlgebricksException {

+        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

+        StringBuilder buffer = new StringBuilder();

+        PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

+        outputWriter.println(buffer);

+        outputWriter.println("rewritten variables: ");

+        outputWriter.flush();

+        printVariables();

 

-	}

+    }

 

-	public static void setOutputPrinter(PrintWriter writer) {

-		outputWriter = writer;

-	}

+    public static void setOutputPrinter(PrintWriter writer) {

+        outputWriter = writer;

+    }

 

-	private void printVariables() {

-		Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap

-				.entrySet();

+    private void printVariables() {

+        Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();

 

-		for (Entry<String, LogicalVariable> entry : entries) {

-			outputWriter.println(entry.getKey() + " -> " + entry.getValue());

-		}

-		outputWriter.flush();

-	}

+        for (Entry<String, LogicalVariable> entry : entries) {

+            outputWriter.println(entry.getKey() + " -> " + entry.getValue());

+        }

+        outputWriter.flush();

+    }

 

-	/**

-	 * generate the object inspector for the output of an operator

-	 * 

-	 * @param operator

-	 *            The Hive operator

-	 * @return an ObjectInspector object

-	 */

-	public Schema generateInputSchema(Operator operator) {

-		List<String> variableNames = new ArrayList<String>();

-		List<TypeInfo> typeList = new ArrayList<TypeInfo>();

-		List<ColumnInfo> columns = operator.getSchema().getSignature();

+    /**

+     * generate the object inspector for the output of an operator

+     * 

+     * @param operator

+     *            The Hive operator

+     * @return an ObjectInspector object

+     */

+    public Schema generateInputSchema(Operator operator) {

+        List<String> variableNames = new ArrayList<String>();

+        List<TypeInfo> typeList = new ArrayList<TypeInfo>();

+        List<ColumnInfo> columns = operator.getSchema().getSignature();

 

-		for (ColumnInfo col : columns) {

-			// typeList.add();

-			TypeInfo type = col.getType();

-			typeList.add(type);

+        for (ColumnInfo col : columns) {

+            // typeList.add();

+            TypeInfo type = col.getType();

+            typeList.add(type);

 

-			String fieldName = col.getInternalName();

-			variableNames.add(fieldName);

-		}

+            String fieldName = col.getInternalName();

+            variableNames.add(fieldName);

+        }

 

-		return new Schema(variableNames, typeList);

-	}

+        return new Schema(variableNames, typeList);

+    }

 

-	/**

-	 * rewrite the names of output columns for feature expression evaluators to

-	 * use

-	 * 

-	 * @param operator

-	 */

-	public void rewriteOperatorOutputSchema(Operator operator) {

-		List<ColumnInfo> columns = operator.getSchema().getSignature();

+    /**

+     * rewrite the names of output columns for feature expression evaluators to

+     * use

+     * 

+     * @param operator

+     */

+    public void rewriteOperatorOutputSchema(Operator operator) {

+        List<ColumnInfo> columns = operator.getSchema().getSignature();

 

-		for (ColumnInfo column : columns) {

-			String columnName = column.getTabAlias() + "."

-					+ column.getInternalName();

-			if (columnName.indexOf("$$") < 0) {

-				LogicalVariable var = getVariable(columnName, column.getType());

-				column.setInternalName(var.toString());

-			}

-		}

-	}

+        for (ColumnInfo column : columns) {

+            String columnName = column.getTabAlias() + "." + column.getInternalName();

+            if (columnName.indexOf("$$") < 0) {

+                LogicalVariable var = getVariable(columnName, column.getType());

+                column.setInternalName(var.toString());

+            }

+        }

+    }

 

-	@Override

-	public void rewriteOperatorOutputSchema(List<LogicalVariable> variables,

-			Operator operator) {

+    @Override

+    public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {

 

-		printOperatorSchema(operator);

-		List<ColumnInfo> columns = operator.getSchema().getSignature();

-		if (variables.size() != columns.size()) {

-			System.out.println("output cardinality error " + operator.getName()

-					+ " variable size: " + variables.size() + " expected "

-					+ columns.size());

-		}

+        printOperatorSchema(operator);

+        List<ColumnInfo> columns = operator.getSchema().getSignature();

+        if (variables.size() != columns.size()) {

+            throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "

+                    + variables.size() + " expected " + columns.size());

+        }

 

-		for (int i = 0; i < variables.size(); i++) {

-			LogicalVariable var = variables.get(i);

-			ColumnInfo column = columns.get(i);

-			String fieldName = column.getTabAlias() + "."

-					+ column.getInternalName();

-			if (fieldName.indexOf("$$") < 0) {

-				updateVariable(fieldName, var);

-				column.setInternalName(var.toString());

-			}

-		}

-		printOperatorSchema(operator);

-	}

+        for (int i = 0; i < variables.size(); i++) {

+            LogicalVariable var = variables.get(i);

+            ColumnInfo column = columns.get(i);

+            String fieldName = column.getTabAlias() + "." + column.getInternalName();

+            if (fieldName.indexOf("$$") < 0) {

+                updateVariable(fieldName, var);

+                column.setInternalName(var.toString());

+            }

+        }

+        printOperatorSchema(operator);

+    }

 

-	/**

-	 * rewrite an expression and substitute variables

-	 * 

-	 * @param expr

-	 *            hive expression

-	 */

-	public void rewriteExpression(ExprNodeDesc expr) {

-		if (expr instanceof ExprNodeColumnDesc) {

-			ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

-			String fieldName = desc.getTabAlias() + "." + desc.getColumn();

-			if (fieldName.indexOf("$$") < 0) {

-				LogicalVariable var = getVariableOnly(fieldName);

-				if (var == null) {

-					fieldName = "." + desc.getColumn();

-					var = getVariableOnly(fieldName);

-					if (var == null) {

-						fieldName = "null." + desc.getColumn();

-						var = getVariableOnly(fieldName);

-						if (var == null) {

-							System.out.println(fieldName + " is wrong!!! ");

-						}

-					}

-				}

-				String name = this.logicalVariableToFieldMap.get(var);

-				var = getVariableOnly(name);

-				desc.setColumn(var.toString());

-			}

-		} else {

-			if (expr.getChildren() != null && expr.getChildren().size() > 0) {

-				List<ExprNodeDesc> children = expr.getChildren();

-				for (ExprNodeDesc desc : children)

-					rewriteExpression(desc);

-			}

-		}

-	}

+    /**

+     * rewrite an expression and substitute variables

+     * 

+     * @param expr

+     *            hive expression

+     */

+    public void rewriteExpression(ExprNodeDesc expr) {

+        if (expr instanceof ExprNodeColumnDesc) {

+            ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

+            String fieldName = desc.getTabAlias() + "." + desc.getColumn();

+            if (fieldName.indexOf("$$") < 0) {

+                LogicalVariable var = getVariableOnly(fieldName);

+                if (var == null) {

+                    fieldName = "." + desc.getColumn();

+                    var = getVariableOnly(fieldName);

+                    if (var == null) {

+                        fieldName = "null." + desc.getColumn();

+                        var = getVariableOnly(fieldName);

+                        if (var == null) {

+                            throw new IllegalStateException(fieldName + " is wrong!!! ");

+                        }

+                    }

+                }

+                String name = this.logicalVariableToFieldMap.get(var);

+                var = getVariableOnly(name);

+                desc.setColumn(var.toString());

+            }

+        } else {

+            if (expr.getChildren() != null && expr.getChildren().size() > 0) {

+                List<ExprNodeDesc> children = expr.getChildren();

+                for (ExprNodeDesc desc : children)

+                    rewriteExpression(desc);

+            }

+        }

+    }

 

-	/**

-	 * rewrite an expression and substitute variables

-	 * 

-	 * @param expr

-	 *            hive expression

-	 */

-	public void rewriteExpressionPartial(ExprNodeDesc expr) {

-		if (expr instanceof ExprNodeColumnDesc) {

-			ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

-			String fieldName = desc.getTabAlias() + "." + desc.getColumn();

-			if (fieldName.indexOf("$$") < 0) {

-				LogicalVariable var = getVariableOnly(fieldName);

-				desc.setColumn(var.toString());

-			}

-		} else {

-			if (expr.getChildren() != null && expr.getChildren().size() > 0) {

-				List<ExprNodeDesc> children = expr.getChildren();

-				for (ExprNodeDesc desc : children)

-					rewriteExpressionPartial(desc);

-			}

-		}

-	}

+    /**

+     * rewrite an expression and substitute variables

+     * 

+     * @param expr

+     *            hive expression

+     */

+    public void rewriteExpressionPartial(ExprNodeDesc expr) {

+        if (expr instanceof ExprNodeColumnDesc) {

+            ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;

+            String fieldName = desc.getTabAlias() + "." + desc.getColumn();

+            if (fieldName.indexOf("$$") < 0) {

+                LogicalVariable var = getVariableOnly(fieldName);

+                desc.setColumn(var.toString());

+            }

+        } else {

+            if (expr.getChildren() != null && expr.getChildren().size() > 0) {

+                List<ExprNodeDesc> children = expr.getChildren();

+                for (ExprNodeDesc desc : children)

+                    rewriteExpressionPartial(desc);

+            }

+        }

+    }

 

-	private void printOperatorSchema(Operator operator) {

-		System.out.println(operator.getName());

-		List<ColumnInfo> columns = operator.getSchema().getSignature();

-		for (ColumnInfo column : columns) {

-			System.out.print(column.getTabAlias() + "."

-					+ column.getInternalName() + "  ");

-		}

-		System.out.println();

-	}

+    private void printOperatorSchema(Operator operator) {

+        System.out.println(operator.getName());

+        List<ColumnInfo> columns = operator.getSchema().getSignature();

+        for (ColumnInfo column : columns) {

+            System.out.print(column.getTabAlias() + "." + column.getInternalName() + "  ");

+        }

+        System.out.println();

+    }

 

-	/**

-	 * translate scalar function expression

-	 * 

-	 * @param hiveExpr

-	 * @return

-	 */

-	public Mutable<ILogicalExpression> translateScalarFucntion(

-			ExprNodeDesc hiveExpr) {

-		ILogicalExpression AlgebricksExpr;

+    /**

+     * translate scalar function expression

+     * 

+     * @param hiveExpr

+     * @return

+     */

+    public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {

+        ILogicalExpression AlgebricksExpr;

 

-		if (hiveExpr instanceof ExprNodeGenericFuncDesc) {

-			List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

-			List<ExprNodeDesc> children = hiveExpr.getChildren();

+        if (hiveExpr instanceof ExprNodeGenericFuncDesc) {

+            List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

+            List<ExprNodeDesc> children = hiveExpr.getChildren();

 

-			for (ExprNodeDesc child : children)

-				arguments.add(translateScalarFucntion(child));

+            for (ExprNodeDesc child : children)

+                arguments.add(translateScalarFucntion(child));

 

-			ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;

-			GenericUDF genericUdf = funcExpr.getGenericUDF();

-			UDF udf = null;

-			if (genericUdf instanceof GenericUDFBridge) {

-				GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;

-				try {

-					udf = bridge.getUdfClass().newInstance();

-				} catch (Exception e) {

-					e.printStackTrace();

-				}

-			}

+            ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;

+            GenericUDF genericUdf = funcExpr.getGenericUDF();

+            UDF udf = null;

+            if (genericUdf instanceof GenericUDFBridge) {

+                GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;

+                try {

+                    udf = bridge.getUdfClass().newInstance();

+                } catch (Exception e) {

+                    e.printStackTrace();

+                }

+            }

 

-			/**

-			 * set up the hive function

-			 */

-			Object hiveFunction = genericUdf;

-			if (udf != null)

-				hiveFunction = udf;

+            /**

+             * set up the hive function

+             */

+            Object hiveFunction = genericUdf;

+            if (udf != null)

+                hiveFunction = udf;

 

-			FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE

-					.getAlgebricksFunctionId(hiveFunction.getClass());

-			if (funcId == null) {

-				funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

-						hiveFunction.getClass().getName());

-			}

+            FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction

+                    .getClass());

+            if (funcId == null) {

+                funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());

+            }

 

-			Object functionInfo = null;

-			if (genericUdf instanceof GenericUDFBridge) {

-				functionInfo = funcExpr;

-			}

+            Object functionInfo = null;

+            if (genericUdf instanceof GenericUDFBridge) {

+                functionInfo = funcExpr;

+            }

 

-			/**

-			 * generate the function call expression

-			 */

-			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

-					new HiveFunctionInfo(funcId, functionInfo), arguments);

-			AlgebricksExpr = AlgebricksFuncExpr;

+            /**

+             * generate the function call expression

+             */

+            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

+                    funcId, functionInfo), arguments);

+            AlgebricksExpr = AlgebricksFuncExpr;

 

-		} else if (hiveExpr instanceof ExprNodeColumnDesc) {

-			ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;

-			LogicalVariable var = this.getVariable(column.getColumn());

-			AlgebricksExpr = new VariableReferenceExpression(var);

+        } else if (hiveExpr instanceof ExprNodeColumnDesc) {

+            ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;

+            LogicalVariable var = this.getVariable(column.getColumn());

+            AlgebricksExpr = new VariableReferenceExpression(var);

 

-		} else if (hiveExpr instanceof ExprNodeFieldDesc) {

-			FunctionIdentifier funcId;

-			funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

-					ExpressionConstant.FIELDACCESS);

+        } else if (hiveExpr instanceof ExprNodeFieldDesc) {

+            FunctionIdentifier funcId;

+            funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);

 

-			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

-					new HiveFunctionInfo(funcId, hiveExpr));

-			AlgebricksExpr = AlgebricksFuncExpr;

-		} else if (hiveExpr instanceof ExprNodeConstantDesc) {

-			ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;

-			Object value = hiveConst.getValue();

-			AlgebricksExpr = new ConstantExpression(

-					new HivesterixConstantValue(value));

-		} else if (hiveExpr instanceof ExprNodeNullDesc) {

-			FunctionIdentifier funcId;

-			funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,

-					ExpressionConstant.NULL);

+            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

+                    funcId, hiveExpr));

+            AlgebricksExpr = AlgebricksFuncExpr;

+        } else if (hiveExpr instanceof ExprNodeConstantDesc) {

+            ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;

+            Object value = hiveConst.getValue();

+            AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));

+        } else if (hiveExpr instanceof ExprNodeNullDesc) {

+            FunctionIdentifier funcId;

+            funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);

 

-			ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(

-					new HiveFunctionInfo(funcId, hiveExpr));

+            ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(

+                    funcId, hiveExpr));

 

-			AlgebricksExpr = AlgebricksFuncExpr;

-		} else {

-			throw new IllegalStateException("unknown hive expression");

-		}

-		return new MutableObject<ILogicalExpression>(AlgebricksExpr);

-	}

+            AlgebricksExpr = AlgebricksFuncExpr;

+        } else {

+            throw new IllegalStateException("unknown hive expression");

+        }

+        return new MutableObject<ILogicalExpression>(AlgebricksExpr);

+    }

 

-	/**

-	 * translate aggregation function expression

-	 * 

-	 * @param aggregateDesc

-	 * @return

-	 */

-	public Mutable<ILogicalExpression> translateAggregation(

-			AggregationDesc aggregateDesc) {

+    /**

+     * translate aggregation function expression

+     * 

+     * @param aggregateDesc

+     * @return

+     */

+    public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {

 

-		String UDAFName = aggregateDesc.getGenericUDAFName();

+        String UDAFName = aggregateDesc.getGenericUDAFName();

 

-		List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

-		List<ExprNodeDesc> children = aggregateDesc.getParameters();

+        List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();

+        List<ExprNodeDesc> children = aggregateDesc.getParameters();

 

-		for (ExprNodeDesc child : children)

-			arguments.add(translateScalarFucntion(child));

+        for (ExprNodeDesc child : children)

+            arguments.add(translateScalarFucntion(child));

 

-		FunctionIdentifier funcId = new FunctionIdentifier(

-				ExpressionConstant.NAMESPACE, UDAFName + "("

-						+ aggregateDesc.getMode() + ")");

-		HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);

-		AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(

-				funcInfo, false, arguments);

-		return new MutableObject<ILogicalExpression>(aggregationExpression);

-	}

+        FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("

+                + aggregateDesc.getMode() + ")");

+        HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);

+        AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,

+                arguments);

+        return new MutableObject<ILogicalExpression>(aggregationExpression);

+    }

 

-	/**

-	 * translate aggregation function expression

-	 * 

-	 * @param aggregator

-	 * @return

-	 */

-	public Mutable<ILogicalExpression> translateUnnestFunction(

-			UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {

+    /**

+     * translate aggregation function expression

+     * 

+     * @param aggregator

+     * @return

+     */

+    public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {

 

-		String UDTFName = udtfDesc.getUDTFName();

+        String UDTFName = udtfDesc.getUDTFName();

 

-		FunctionIdentifier funcId = new FunctionIdentifier(

-				ExpressionConstant.NAMESPACE, UDTFName);

-		UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(

-				new HiveFunctionInfo(funcId, udtfDesc));

-		unnestingExpression.getArguments().add(argument);

-		return new MutableObject<ILogicalExpression>(unnestingExpression);

-	}

+        FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);

+        UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(

+                funcId, udtfDesc));

+        unnestingExpression.getArguments().add(argument);

+        return new MutableObject<ILogicalExpression>(unnestingExpression);

+    }

 

-	/**

-	 * get typeinfo

-	 */

-	@Override

-	public TypeInfo getType(LogicalVariable var) {

-		return variableToType.get(var);

-	}

+    /**

+     * get typeinfo

+     */

+    @Override

+    public TypeInfo getType(LogicalVariable var) {

+        return variableToType.get(var);

+    }

 

-	/**

-	 * get variable from variable name

-	 */

-	@Override

-	public LogicalVariable getVariable(String name) {

-		return nameToLogicalVariableMap.get(name);

-	}

+    /**

+     * get variable from variable name

+     */

+    @Override

+    public LogicalVariable getVariable(String name) {

+        return nameToLogicalVariableMap.get(name);

+    }

 

-	@Override

-	public LogicalVariable getVariableFromFieldName(String fieldName) {

-		return this.getVariableOnly(fieldName);

-	}

+    @Override

+    public LogicalVariable getVariableFromFieldName(String fieldName) {

+        return this.getVariableOnly(fieldName);

+    }

 

-	/**

-	 * set the metadata provider

-	 */

-	@Override

-	public void setMetadataProvider(

-			IMetadataProvider<PartitionDesc, Object> metadata) {

-		this.metaData = metadata;

-	}

+    /**

+     * set the metadata provider

+     */

+    @Override

+    public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {

+        this.metaData = metadata;

+    }

 

-	/**

-	 * insert ReplicateOperator when necessary

-	 */

-	private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {

-		Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();

-		buildChildToParentsMapping(roots, childToParentsMap);

-		for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap

-				.entrySet()) {

-			List<Mutable<ILogicalOperator>> pList = entry.getValue();

-			if (pList.size() > 1) {

-				ILogicalOperator rop = new ReplicateOperator(pList.size());

-				Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(

-						rop);

-				Mutable<ILogicalOperator> childRef = entry.getKey();

-				rop.getInputs().add(childRef);

-				for (Mutable<ILogicalOperator> parentRef : pList) {

-					ILogicalOperator parentOp = parentRef.getValue();

-					int index = parentOp.getInputs().indexOf(childRef);

-					parentOp.getInputs().set(index, ropRef);

-				}

-			}

-		}

-	}

+    /**

+     * insert ReplicateOperator when necessary

+     */

+    private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {

+        Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();

+        buildChildToParentsMapping(roots, childToParentsMap);

+        for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {

+            List<Mutable<ILogicalOperator>> pList = entry.getValue();

+            if (pList.size() > 1) {

+                ILogicalOperator rop = new ReplicateOperator(pList.size());

+                Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);

+                Mutable<ILogicalOperator> childRef = entry.getKey();

+                rop.getInputs().add(childRef);

+                for (Mutable<ILogicalOperator> parentRef : pList) {

+                    ILogicalOperator parentOp = parentRef.getValue();

+                    int index = parentOp.getInputs().indexOf(childRef);

+                    parentOp.getInputs().set(index, ropRef);

+                }

+            }

+        }

+    }

 

-	/**

-	 * build the mapping from child to Parents

-	 * 

-	 * @param roots

-	 * @param childToParentsMap

-	 */

-	private void buildChildToParentsMapping(

-			List<Mutable<ILogicalOperator>> roots,

-			Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {

-		for (Mutable<ILogicalOperator> opRef : roots) {

-			List<Mutable<ILogicalOperator>> childRefs = opRef.getValue()

-					.getInputs();

-			for (Mutable<ILogicalOperator> childRef : childRefs) {

-				List<Mutable<ILogicalOperator>> parentList = map.get(childRef);

-				if (parentList == null) {

-					parentList = new ArrayList<Mutable<ILogicalOperator>>();

-					map.put(childRef, parentList);

-				}

-				if (!parentList.contains(opRef))

-					parentList.add(opRef);

-			}

-			buildChildToParentsMapping(childRefs, map);

-		}

-	}

+    /**

+     * build the mapping from child to Parents

+     * 

+     * @param roots

+     * @param childToParentsMap

+     */

+    private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,

+            Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {

+        for (Mutable<ILogicalOperator> opRef : roots) {

+            List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();

+            for (Mutable<ILogicalOperator> childRef : childRefs) {

+                List<Mutable<ILogicalOperator>> parentList = map.get(childRef);

+                if (parentList == null) {

+                    parentList = new ArrayList<Mutable<ILogicalOperator>>();

+                    map.put(childRef, parentList);

+                }

+                if (!parentList.contains(opRef))

+                    parentList.add(opRef);

+            }

+            buildChildToParentsMapping(childRefs, map);

+        }

+    }

 }

diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 2d8829c..6c1ac72 100644
--- a/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;

 import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;

 import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;

+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;

 import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;

 import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;

 import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;

@@ -71,527 +72,483 @@
 @SuppressWarnings({ "rawtypes", "unchecked" })

 public class HyracksExecutionEngine implements IExecutionEngine {

 

-	private static final Log LOG = LogFactory

-			.getLog(HyracksExecutionEngine.class.getName());

+    private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

 

-	// private static final String[] locConstraints = {}

+    // private static final String[] locConstraints = {}

 

-	private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-	private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-	static {

-		SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(

-				false);

-		SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(

-				true);

-		SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(

-				true);

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlFullDfs, HiveRuleCollections.NORMALIZATION));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs,

-						HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlFullDfs, HiveRuleCollections.LOAD_FIELDS));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs, HiveRuleCollections.OP_PUSHDOWN));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.DATA_EXCHANGE));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs, HiveRuleCollections.CONSOLIDATION));

+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

+    static {

+        SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);

+        SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);

+        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

+                HiveRuleCollections.NORMALIZATION));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

+                HiveRuleCollections.LOAD_FIELDS));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.OP_PUSHDOWN));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.DATA_EXCHANGE));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.CONSOLIDATION));

 

-		DEFAULT_PHYSICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.PHYSICAL_PLAN_REWRITES));

-		DEFAULT_PHYSICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.prepareJobGenRules));

-	}

+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.PHYSICAL_PLAN_REWRITES));

+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.prepareJobGenRules));

+    }

 

-	/**

-	 * static configurations for compiler

-	 */

-	private HeuristicCompilerFactoryBuilder builder;

+    /**

+     * static configurations for compiler

+     */

+    private HeuristicCompilerFactoryBuilder builder;

 

-	/**

-	 * compiler

-	 */

-	private ICompiler compiler;

+    /**

+     * compiler

+     */

+    private ICompiler compiler;

 

-	/**

-	 * physical optimization config

-	 */

-	private PhysicalOptimizationConfig physicalOptimizationConfig;

+    /**

+     * physical optimization config

+     */

+    private PhysicalOptimizationConfig physicalOptimizationConfig;

 

-	/**

-	 * final ending operators

-	 */

-	private List<Operator> leaveOps = new ArrayList<Operator>();

+    /**

+     * final ending operators

+     */

+    private List<Operator> leaveOps = new ArrayList<Operator>();

 

-	/**

-	 * tasks that are already visited

-	 */

-	private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();

+    /**

+     * tasks that are already visited

+     */

+    private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();

 

-	/**

-	 * hyracks job spec

-	 */

-	private JobSpecification jobSpec;

+    /**

+     * hyracks job spec

+     */

+    private JobSpecification jobSpec;

 

-	/**

-	 * hive configuration

-	 */

-	private HiveConf conf;

+    /**

+     * hive configuration

+     */

+    private HiveConf conf;

 

-	/**

-	 * plan printer

-	 */

-	private PrintWriter planPrinter;

+    /**

+     * plan printer

+     */

+    private PrintWriter planPrinter;

 

-	public HyracksExecutionEngine(HiveConf conf) {

-		this.conf = conf;

-		init(conf);

-	}

+    public HyracksExecutionEngine(HiveConf conf) {

+        this.conf = conf;

+        init(conf);

+    }

 

-	public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {

-		this.conf = conf;

-		this.planPrinter = planPrinter;

-		init(conf);

-	}

+    public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {

+        this.conf = conf;

+        this.planPrinter = planPrinter;

+        init(conf);

+    }

 

-	private void init(HiveConf conf) {

-		builder = new HeuristicCompilerFactoryBuilder(

-				DefaultOptimizationContextFactory.INSTANCE);

-		builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);

-		builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);

-		builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);

-		builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);

-		builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);

+    private void init(HiveConf conf) {

+        builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);

+        builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);

+        builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);

+        builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);

+        builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);

+        builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);

 

-		long memSizeExternalGby = conf.getLong(

-				"hive.algebricks.groupby.external.memory", 268435456);

-		long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory",

-				536870912);

-		int frameSize = conf.getInt("hive.algebricks.framesize", 32768);

+        long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);

+        long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);

+        int frameSize = conf.getInt("hive.algebricks.framesize", 32768);

 

-		physicalOptimizationConfig = new PhysicalOptimizationConfig();

-		int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);

-		physicalOptimizationConfig

-				.setMaxFramesExternalGroupBy(frameLimitExtGby);

-		int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);

-		physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);

-		builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);

-	}

+        physicalOptimizationConfig = new PhysicalOptimizationConfig();

+        int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);

+        physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);

+        int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);

+        physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);

+        builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);

+    }

 

-	@Override

-	public int compileJob(List<Task<? extends Serializable>> rootTasks) {

-		// clean up

-		leaveOps.clear();

-		tasksVisited.clear();

-		jobSpec = null;

+    @Override

+    public int compileJob(List<Task<? extends Serializable>> rootTasks) {

+        // clean up

+        leaveOps.clear();

+        tasksVisited.clear();

+        jobSpec = null;

 

-		HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();

-		List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);

+        HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();

+        List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);

 

-		// get all leave Ops

-		getLeaves(rootOps, leaveOps);

+        // get all leave Ops

+        getLeaves(rootOps, leaveOps);

 

-		HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();

-		try {

-			translator.translate(rootOps, null, aliasToPath);

+        HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();

+        try {

+            translator.translate(rootOps, null, aliasToPath);

 

-			ILogicalPlan plan = translator.genLogicalPlan();

+            ILogicalPlan plan = translator.genLogicalPlan();

 

-			if (plan.getRoots() != null && plan.getRoots().size() > 0

-					&& plan.getRoots().get(0).getValue() != null) {

-				translator.printOperators();

-				System.out.println("translate complete");

-				ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(

-						plan, translator.getMetadataProvider());

+            if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {

+                translator.printOperators();

+                ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,

+                        translator.getMetadataProvider());

 

-				ICompilerFactory compilerFactory = builder.create();

-				compiler = compilerFactory.createCompiler(

-						planAndMetadata.getPlan(),

-						planAndMetadata.getMetadataProvider(),

-						translator.getVariableCounter());

+                ICompilerFactory compilerFactory = builder.create();

+                compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),

+                        planAndMetadata.getMetadataProvider(), translator.getVariableCounter());

 

-				// run optimization and re-writing rules for Hive plan

-				compiler.optimize();

+                // run optimization and re-writing rules for Hive plan

+                compiler.optimize();

 

-				// print optimized plan

-				LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

-				StringBuilder buffer = new StringBuilder();

-				PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

-				String planStr = buffer.toString();

-				System.out.println(planStr);

+                // print optimized plan

+                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

+                StringBuilder buffer = new StringBuilder();

+                PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

+                String planStr = buffer.toString();

+                System.out.println(planStr);

 

-				if (planPrinter != null)

-					planPrinter.print(planStr);

-			}

-		} catch (Exception e) {

-			e.printStackTrace();

-			return 1;

-		}

+                if (planPrinter != null)

+                    planPrinter.print(planStr);

+            }

+        } catch (Exception e) {

+            e.printStackTrace();

+            return 1;

+        }

 

-		return 0;

-	}

+        return 0;

+    }

 

-	private void codeGen() throws AlgebricksException {

-		// number of cpu cores in the cluster

-		builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(

-				ConfUtil.getNCs()));

-		// builder.setClusterTopology(ConfUtil.getClusterTopology());

-		builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);

-		builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);

-		builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);

-		builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);

-		builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);

-		builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);

-		builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);

-		builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);

-		builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);

-		builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);

-		builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);

+    private void codeGen() throws AlgebricksException {

+        // number of cpu cores in the cluster

+        builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));

+        // builder.setClusterTopology(ConfUtil.getClusterTopology());

+        builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);

+        builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);

+        builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);

+        builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);

+        builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);

+        builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);

+        builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);

+        builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);

+        builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);

+        builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);

+        builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);

+        builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);

 

-		jobSpec = compiler.createJob(null);

+        jobSpec = compiler.createJob(null);

 

-		// set the policy

-		String policyStr = conf.get("hive.hyracks.connectorpolicy");

-		if (policyStr == null)

-			policyStr = "PIPELINING";

-		Policy policyValue = Policy.valueOf(policyStr);

-		jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(

-				policyValue));

+        // set the policy

+        String policyStr = conf.get("hive.hyracks.connectorpolicy");

+        if (policyStr == null)

+            policyStr = "PIPELINING";

+        Policy policyValue = Policy.valueOf(policyStr);

+        jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));

+        jobSpec.setUseConnectorPolicyForScheduling(false);

+    }

 

-		// execute the job

-		System.out.println(jobSpec.toString());

-	}

+    @Override

+    public int executeJob() {

+        try {

+            codeGen();

+            executeHyraxJob(jobSpec);

+        } catch (Exception e) {

+            e.printStackTrace();

+            return 1;

+        }

+        return 0;

+    }

 

-	@Override

-	public int executeJob() {

-		try {

-			codeGen();

-			executeHyraxJob(jobSpec);

-		} catch (Exception e) {

-			e.printStackTrace();

-			return 1;

-		}

-		return 0;

-	}

+    private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,

+            HashMap<String, PartitionDesc> aliasToPath) {

 

-	private List<Operator> generateRootOperatorDAG(

-			List<Task<? extends Serializable>> rootTasks,

-			HashMap<String, PartitionDesc> aliasToPath) {

+        List<Operator> rootOps = new ArrayList<Operator>();

+        List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();

+        tasksVisited.clear();

 

-		List<Operator> rootOps = new ArrayList<Operator>();

-		List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();

-		tasksVisited.clear();

+        for (int i = rootTasks.size() - 1; i >= 0; i--) {

+            /**

+             * list of map-reduce tasks

+             */

+            Task<? extends Serializable> task = rootTasks.get(i);

+            // System.out.println("!" + task.getName());

 

-		for (int i = rootTasks.size() - 1; i >= 0; i--) {

-			/**

-			 * list of map-reduce tasks

-			 */

-			Task<? extends Serializable> task = rootTasks.get(i);

-			// System.out.println("!" + task.getName());

+            if (task instanceof MapRedTask) {

+                List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);

+                if (i == 0)

+                    rootOps.addAll(mapRootOps);

+                else {

+                    List<Operator> leaves = new ArrayList<Operator>();

+                    getLeaves(rootOps, leaves);

 

-			if (task instanceof MapRedTask) {

-				List<Operator> mapRootOps = articulateMapReduceOperators(task,

-						rootOps, aliasToPath, rootTasks);

-				if (i == 0)

-					rootOps.addAll(mapRootOps);

-				else {

-					List<Operator> leaves = new ArrayList<Operator>();

-					getLeaves(rootOps, leaves);

+                    List<Operator> mapChildren = new ArrayList<Operator>();

+                    for (Operator childMap : mapRootOps) {

+                        if (childMap instanceof TableScanOperator) {

+                            TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

+                            if (topDesc == null)

+                                mapChildren.add(childMap);

+                            else {

+                                rootOps.add(childMap);

+                            }

+                        } else

+                            mapChildren.add(childMap);

+                    }

 

-					List<Operator> mapChildren = new ArrayList<Operator>();

-					for (Operator childMap : mapRootOps) {

-						if (childMap instanceof TableScanOperator) {

-							TableScanDesc topDesc = (TableScanDesc) childMap

-									.getConf();

-							if (topDesc == null)

-								mapChildren.add(childMap);

-							else {

-								rootOps.add(childMap);

-							}

-						} else

-							mapChildren.add(childMap);

-					}

+                    if (mapChildren.size() > 0) {

+                        for (Operator leaf : leaves)

+                            leaf.setChildOperators(mapChildren);

+                        for (Operator child : mapChildren)

+                            child.setParentOperators(leaves);

+                    }

+                }

 

-					if (mapChildren.size() > 0) {

-						for (Operator leaf : leaves)

-							leaf.setChildOperators(mapChildren);

-						for (Operator child : mapChildren)

-							child.setParentOperators(leaves);

-					}

-				}

+                MapredWork mr = (MapredWork) task.getWork();

+                HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

 

-				MapredWork mr = (MapredWork) task.getWork();

-				HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

+                addAliasToPartition(aliasToPath, map);

+                toDelete.add(task);

+            }

+        }

 

-				addAliasToPartition(aliasToPath, map);

-				toDelete.add(task);

-			}

-		}

+        for (Task<? extends Serializable> task : toDelete)

+            rootTasks.remove(task);

 

-		for (Task<? extends Serializable> task : toDelete)

-			rootTasks.remove(task);

-

-		return rootOps;

-	}

+        return rootOps;

+    }

 

-	private void addAliasToPartition(

-			HashMap<String, PartitionDesc> aliasToPath,

-			HashMap<String, PartitionDesc> map) {

-		Iterator<String> keys = map.keySet().iterator();

-		while (keys.hasNext()) {

-			String key = keys.next();

-			PartitionDesc part = map.get(key);

-			String[] names = key.split(":");

-			for (String name : names) {

-				aliasToPath.put(name, part);

-			}

-		}

-	}

+    private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {

+        Iterator<String> keys = map.keySet().iterator();

+        while (keys.hasNext()) {

+            String key = keys.next();

+            PartitionDesc part = map.get(key);

+            String[] names = key.split(":");

+            for (String name : names) {

+                aliasToPath.put(name, part);

+            }

+        }

+    }

 

-	private List<Operator> articulateMapReduceOperators(Task task,

-			List<Operator> rootOps, HashMap<String, PartitionDesc> aliasToPath,

-			List<Task<? extends Serializable>> rootTasks) {

-		// System.out.println("!"+task.getName());

-		if (!(task instanceof MapRedTask)) {

-			if (!(task instanceof ConditionalTask)) {

-				rootTasks.add(task);

-				return null;

-			} else {

-				// remove map-reduce branches in condition task

-				ConditionalTask condition = (ConditionalTask) task;

-				List<Task<? extends Serializable>> branches = condition

-						.getListTasks();

-				for (int i = branches.size() - 1; i >= 0; i--) {

-					Task branch = branches.get(i);

-					if (branch instanceof MapRedTask) {

-						return articulateMapReduceOperators(branch, rootOps,

-								aliasToPath, rootTasks);

-					}

-				}

-				rootTasks.add(task);

-				return null;

-			}

-		}

+    private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,

+            HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {

+        // System.out.println("!"+task.getName());

+        if (!(task instanceof MapRedTask)) {

+            if (!(task instanceof ConditionalTask)) {

+                rootTasks.add(task);

+                return null;

+            } else {

+                // remove map-reduce branches in condition task

+                ConditionalTask condition = (ConditionalTask) task;

+                List<Task<? extends Serializable>> branches = condition.getListTasks();

+                for (int i = branches.size() - 1; i >= 0; i--) {

+                    Task branch = branches.get(i);

+                    if (branch instanceof MapRedTask) {

+                        return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);

+                    }

+                }

+                rootTasks.add(task);

+                return null;

+            }

+        }

 

-		MapredWork mr = (MapredWork) task.getWork();

-		HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

+        MapredWork mr = (MapredWork) task.getWork();

+        HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

 

-		// put all aliasToParitionDesc mapping into the map

-		addAliasToPartition(aliasToPath, map);

+        // put all aliasToParitionDesc mapping into the map

+        addAliasToPartition(aliasToPath, map);

 

-		MapRedTask mrtask = (MapRedTask) task;

-		MapredWork work = (MapredWork) mrtask.getWork();

-		HashMap<String, Operator<? extends Serializable>> operators = work

-				.getAliasToWork();

+        MapRedTask mrtask = (MapRedTask) task;

+        MapredWork work = (MapredWork) mrtask.getWork();

+        HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();

 

-		Set entries = operators.entrySet();

-		Iterator<Entry<String, Operator>> iterator = entries.iterator();

-		List<Operator> mapRootOps = new ArrayList<Operator>();

+        Set entries = operators.entrySet();

+        Iterator<Entry<String, Operator>> iterator = entries.iterator();

+        List<Operator> mapRootOps = new ArrayList<Operator>();

 

-		// get map root operators

-		while (iterator.hasNext()) {

-			Operator next = iterator.next().getValue();

-			if (!mapRootOps.contains(next)) {

-				// clear that only for the case of union

-				mapRootOps.add(next);

-			}

-		}

+        // get map root operators

+        while (iterator.hasNext()) {

+            Operator next = iterator.next().getValue();

+            if (!mapRootOps.contains(next)) {

+                // clear that only for the case of union

+                mapRootOps.add(next);

+            }

+        }

 

-		// get map local work

-		MapredLocalWork localWork = work.getMapLocalWork();

-		if (localWork != null) {

-			HashMap<String, Operator<? extends Serializable>> localOperators = localWork

-					.getAliasToWork();

+        // get map local work

+        MapredLocalWork localWork = work.getMapLocalWork();

+        if (localWork != null) {

+            HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();

 

-			Set localEntries = localOperators.entrySet();

-			Iterator<Entry<String, Operator>> localIterator = localEntries

-					.iterator();

-			while (localIterator.hasNext()) {

-				mapRootOps.add(localIterator.next().getValue());

-			}

+            Set localEntries = localOperators.entrySet();

+            Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();

+            while (localIterator.hasNext()) {

+                mapRootOps.add(localIterator.next().getValue());

+            }

 

-			HashMap<String, FetchWork> localFetch = localWork

-					.getAliasToFetchWork();

-			Set localFetchEntries = localFetch.entrySet();

-			Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries

-					.iterator();

-			while (localFetchIterator.hasNext()) {

-				Entry<String, FetchWork> fetchMap = localFetchIterator.next();

-				FetchWork fetch = fetchMap.getValue();

-				String alias = fetchMap.getKey();

-				List<PartitionDesc> dirPart = fetch.getPartDesc();

+            HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();

+            Set localFetchEntries = localFetch.entrySet();

+            Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();

+            while (localFetchIterator.hasNext()) {

+                Entry<String, FetchWork> fetchMap = localFetchIterator.next();

+                FetchWork fetch = fetchMap.getValue();

+                String alias = fetchMap.getKey();

+                List<PartitionDesc> dirPart = fetch.getPartDesc();

 

-				// temporary hack: put the first partitionDesc into the map

-				aliasToPath.put(alias, dirPart.get(0));

-			}

-		}

+                // temporary hack: put the first partitionDesc into the map

+                aliasToPath.put(alias, dirPart.get(0));

+            }

+        }

 

-		Boolean visited = tasksVisited.get(task);

-		if (visited != null && visited.booleanValue() == true) {

-			return mapRootOps;

-		}

+        Boolean visited = tasksVisited.get(task);

+        if (visited != null && visited.booleanValue() == true) {

+            return mapRootOps;

+        }

 

-		// do that only for union operator

-		for (Operator op : mapRootOps)

-			if (op.getParentOperators() != null)

-				op.getParentOperators().clear();

+        // do that only for union operator

+        for (Operator op : mapRootOps)

+            if (op.getParentOperators() != null)

+                op.getParentOperators().clear();

 

-		List<Operator> mapLeaves = new ArrayList<Operator>();

-		downToLeaves(mapRootOps, mapLeaves);

-		List<Operator> reduceOps = new ArrayList<Operator>();

+        List<Operator> mapLeaves = new ArrayList<Operator>();

+        downToLeaves(mapRootOps, mapLeaves);

+        List<Operator> reduceOps = new ArrayList<Operator>();

 

-		if (work.getReducer() != null)

-			reduceOps.add(work.getReducer());

+        if (work.getReducer() != null)

+            reduceOps.add(work.getReducer());

 

-		for (Operator mapLeaf : mapLeaves) {

-			mapLeaf.setChildOperators(reduceOps);

-		}

+        for (Operator mapLeaf : mapLeaves) {

+            mapLeaf.setChildOperators(reduceOps);

+        }

 

-		for (Operator reduceOp : reduceOps) {

-			if (reduceOp != null)

-				reduceOp.setParentOperators(mapLeaves);

-		}

+        for (Operator reduceOp : reduceOps) {

+            if (reduceOp != null)

+                reduceOp.setParentOperators(mapLeaves);

+        }

 

-		List<Operator> leafs = new ArrayList<Operator>();

-		if (reduceOps.size() > 0) {

-			downToLeaves(reduceOps, leafs);

-		} else {

-			leafs = mapLeaves;

-		}

+        List<Operator> leafs = new ArrayList<Operator>();

+        if (reduceOps.size() > 0) {

+            downToLeaves(reduceOps, leafs);

+        } else {

+            leafs = mapLeaves;

+        }

 

-		List<Operator> mapChildren = new ArrayList<Operator>();

-		if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {

-			System.out.println("have child tasks!!");

-			for (Object child : task.getChildTasks()) {

-				System.out.println(child.getClass().getName());

-				List<Operator> childMapOps = articulateMapReduceOperators(

-						(Task) child, rootOps, aliasToPath, rootTasks);

-				if (childMapOps == null)

-					continue;

+        List<Operator> mapChildren = new ArrayList<Operator>();

+        if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {

+            for (Object child : task.getChildTasks()) {

+                List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);

+                if (childMapOps == null)

+                    continue;

 

-				for (Operator childMap : childMapOps) {

-					if (childMap instanceof TableScanOperator) {

-						TableScanDesc topDesc = (TableScanDesc) childMap

-								.getConf();

-						if (topDesc == null)

-							mapChildren.add(childMap);

-						else {

-							rootOps.add(childMap);

-						}

-					} else {

-						// if not table scan, add the child

-						mapChildren.add(childMap);

-					}

-				}

-			}

+                for (Operator childMap : childMapOps) {

+                    if (childMap instanceof TableScanOperator) {

+                        TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

+                        if (topDesc == null)

+                            mapChildren.add(childMap);

+                        else {

+                            rootOps.add(childMap);

+                        }

+                    } else {

+                        // if not table scan, add the child

+                        mapChildren.add(childMap);

+                    }

+                }

+            }

 

-			if (mapChildren.size() > 0) {

-				int i = 0;

-				for (Operator leaf : leafs) {

-					if (leaf.getChildOperators() == null

-							|| leaf.getChildOperators().size() == 0)

-						leaf.setChildOperators(new ArrayList<Operator>());

-					leaf.getChildOperators().add(mapChildren.get(i));

-					i++;

-				}

-				i = 0;

-				for (Operator child : mapChildren) {

-					if (child.getParentOperators() == null

-							|| child.getParentOperators().size() == 0)

-						child.setParentOperators(new ArrayList<Operator>());

-					child.getParentOperators().add(leafs.get(i));

-					i++;

-				}

-			}

-		}

+            if (mapChildren.size() > 0) {

+                int i = 0;

+                for (Operator leaf : leafs) {

+                    if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)

+                        leaf.setChildOperators(new ArrayList<Operator>());

+                    leaf.getChildOperators().add(mapChildren.get(i));

+                    i++;

+                }

+                i = 0;

+                for (Operator child : mapChildren) {

+                    if (child.getParentOperators() == null || child.getParentOperators().size() == 0)

+                        child.setParentOperators(new ArrayList<Operator>());

+                    child.getParentOperators().add(leafs.get(i));

+                    i++;

+                }

+            }

+        }

 

-		// mark this task as visited

-		this.tasksVisited.put(task, true);

-		return mapRootOps;

-	}

+        // mark this task as visited

+        this.tasksVisited.put(task, true);

+        return mapRootOps;

+    }

 

-	/**

-	 * down to leaf nodes

-	 * 

-	 * @param ops

-	 * @param leaves

-	 */

-	private void downToLeaves(List<Operator> ops, List<Operator> leaves) {

+    /**

+     * down to leaf nodes

+     * 

+     * @param ops

+     * @param leaves

+     */

+    private void downToLeaves(List<Operator> ops, List<Operator> leaves) {

 

-		// Operator currentOp;

-		for (Operator op : ops) {

-			if (op != null && op.getChildOperators() != null

-					&& op.getChildOperators().size() > 0) {

-				downToLeaves(op.getChildOperators(), leaves);

-			} else {

-				if (op != null && leaves.indexOf(op) < 0)

-					leaves.add(op);

-			}

-		}

-	}

+        // Operator currentOp;

+        for (Operator op : ops) {

+            if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {

+                downToLeaves(op.getChildOperators(), leaves);

+            } else {

+                if (op != null && leaves.indexOf(op) < 0)

+                    leaves.add(op);

+            }

+        }

+    }

 

-	private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {

-		for (Operator op : roots) {

-			List<Operator> children = op.getChildOperators();

-			if (children == null || children.size() <= 0) {

-				currentLeaves.add(op);

-			} else {

-				getLeaves(children, currentLeaves);

-			}

-		}

-	}

+    private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {

+        for (Operator op : roots) {

+            List<Operator> children = op.getChildOperators();

+            if (children == null || children.size() <= 0) {

+                currentLeaves.add(op);

+            } else {

+                getLeaves(children, currentLeaves);

+            }

+        }

+    }

 

-	private void executeHyraxJob(JobSpecification job) throws Exception {

-		String ipAddress = conf.get("hive.hyracks.host");

-		int port = Integer.parseInt(conf.get("hive.hyracks.port"));

-		String applicationName = conf.get("hive.hyracks.app");

-		System.out.println("connect to " + ipAddress + " " + port);

+    private void executeHyraxJob(JobSpecification job) throws Exception {

+        String ipAddress = conf.get("hive.hyracks.host");

+        int port = Integer.parseInt(conf.get("hive.hyracks.port"));

+        String applicationName = conf.get("hive.hyracks.app");

+        //System.out.println("connect to " + ipAddress + " " + port);

 

-		IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+        IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

 

-		System.out.println("get connected");

-		long start = System.currentTimeMillis();

-		JobId jobId = hcc.startJob(applicationName, job);

-		hcc.waitForCompletion(jobId);

+        //System.out.println("get connected");

+        long start = System.currentTimeMillis();

+        JobId jobId = hcc.startJob(applicationName, job);

+        hcc.waitForCompletion(jobId);

 

-		System.out.println("job finished: " + jobId.toString());

-		// call all leave nodes to end

-		for (Operator leaf : leaveOps) {

-			jobClose(leaf);

-		}

+        //System.out.println("job finished: " + jobId.toString());

+        // call all leave nodes to end

+        for (Operator leaf : leaveOps) {

+            jobClose(leaf);

+        }

 

-		long end = System.currentTimeMillis();

-		System.err.println(start + " " + end + " " + (end - start));

-	}

+        long end = System.currentTimeMillis();

+        System.err.println(start + " " + end + " " + (end - start));

+    }

 

-	/**

-	 * mv to final directory on hdfs (not real final)

-	 * 

-	 * @param leaf

-	 * @throws Exception

-	 */

-	private void jobClose(Operator leaf) throws Exception {

-		FileSinkOperator fsOp = (FileSinkOperator) leaf;

-		FileSinkDesc desc = fsOp.getConf();

-		boolean isNativeTable = !desc.getTableInfo().isNonNative();

-		if ((conf != null) && isNativeTable) {

-			String specPath = desc.getDirName();

-			DynamicPartitionCtx dpCtx = desc.getDynPartCtx();

-			// for 0.7.0

-			fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);

-			// for 0.8.0

-			// Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,

-			// desc);

-		}

-	}

+    /**

+     * mv to final directory on hdfs (not real final)

+     * 

+     * @param leaf

+     * @throws Exception

+     */

+    private void jobClose(Operator leaf) throws Exception {

+        FileSinkOperator fsOp = (FileSinkOperator) leaf;

+        FileSinkDesc desc = fsOp.getConf();

+        boolean isNativeTable = !desc.getTableInfo().isNonNative();

+        if ((conf != null) && isNativeTable) {

+            String specPath = desc.getDirName();

+            DynamicPartitionCtx dpCtx = desc.getDynPartCtx();

+            // for 0.7.0

+            fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);

+            // for 0.8.0

+            // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,

+            // desc);

+        }

+    }

 }

diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java b/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..760a614
--- /dev/null
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+
+    public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
+
+    private static final long serialVersionUID = 1L;
+
+    private MurmurHash3BinaryHashFunctionFamily() {
+    }
+
+    private static final int C1 = 0xcc9e2d51;
+    private static final int C2 = 0x1b873593;
+    private static final int C3 = 5;
+    private static final int C4 = 0xe6546b64;
+    private static final int C5 = 0x85ebca6b;
+    private static final int C6 = 0xc2b2ae35;
+
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+        return new IBinaryHashFunction() {
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                int h = seed;
+                int p = offset;
+                int remain = length;
+                while (remain >= 4) {
+                    int k = (bytes[p] & 0xff) | ((bytes[p + 1] & 0xff) << 8) | ((bytes[p + 2] & 0xff) << 16)
+                            | ((bytes[p + 3] & 0xff) << 24);
+                    k *= C1;
+                    k = Integer.rotateLeft(k, 15);
+                    k *= C2;
+                    h ^= k;
+                    h = Integer.rotateLeft(h, 13);
+                    h = h * C3 + C4;
+                    p += 4;
+                    remain -= 4;
+                }
+                if (remain > 0) {
+                    int k = 0;
+                    for (int i = 0; remain > 0; i += 8) {
+                        k ^= (bytes[p++] & 0xff) << i;
+                        remain--;
+                    }
+                    k *= C1;
+                    k = Integer.rotateLeft(k, 15);
+                    k *= C2;
+                    h ^= k;
+                }
+                h ^= length;
+                h ^= (h >>> 16);
+                h *= C5;
+                h ^= (h >>> 13);
+                h *= C6;
+                h ^= (h >>> 16);
+                return h;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java b/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
index 12222b4..7681bd1 100644
--- a/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
@@ -175,9 +175,7 @@
 				if (outputColumnsOffset[i] >= 0)

 					outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);

 

-			long serDeTime = 0;

 			while (reader.next(key, value)) {

-				long start = System.currentTimeMillis();

 				// reuse the tuple builder

 				tb.reset();

 				if (parser != null) {

@@ -205,8 +203,6 @@
 						i++;

 					}

 				}

-				long end = System.currentTimeMillis();

-				serDeTime += (end - start);

 

 				if (!appender.append(tb.getFieldEndOffsets(),

 						tb.getByteArray(), 0, tb.getSize())) {

@@ -221,7 +217,6 @@
 					}

 				}

 			}

-			System.out.println("serde time: " + serDeTime);

 			reader.close();

 			System.gc();

 

diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java b/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..e7a2e79
--- /dev/null
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class HiveBinaryHashFunctionFamilyProvider implements IBinaryHashFunctionFamilyProvider {
+
+    public static HiveBinaryHashFunctionFamilyProvider INSTANCE = new HiveBinaryHashFunctionFamilyProvider();
+
+    private HiveBinaryHashFunctionFamilyProvider() {
+
+    }
+
+    @Override
+    public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type) throws AlgebricksException {
+        return MurmurHash3BinaryHashFunctionFamily.INSTANCE;
+    }
+}