fix the binary hash function family issue in hivesterix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2759 123451ca-8445-de46-9d55-352943316053
diff --git a/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java b/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
index 0485380..1fb973e 100644
--- a/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
+++ b/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
@@ -80,770 +80,729 @@
@SuppressWarnings("rawtypes")
public class HiveAlgebricksTranslator implements Translator {
- private int currentVariable = 0;
+ private int currentVariable = 0;
- private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();
+ private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();
- private boolean continueTraverse = true;
+ private boolean continueTraverse = true;
- private IMetadataProvider<PartitionDesc, Object> metaData;
+ private IMetadataProvider<PartitionDesc, Object> metaData;
- /**
- * map variable name to the logical variable
- */
- private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();
+ /**
+ * map variable name to the logical variable
+ */
+ private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();
- /**
- * map field name to LogicalVariable
- */
- private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();
+ /**
+ * map field name to LogicalVariable
+ */
+ private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();
- /**
- * map logical variable to name
- */
- private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();
+ /**
+ * map logical variable to name
+ */
+ private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();
- /**
- * asterix root operators
- */
- private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();
+ /**
+ * asterix root operators
+ */
+ private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();
- /**
- * a list of visitors
- */
- private List<Visitor> visitors = new ArrayList<Visitor>();
+ /**
+ * a list of visitors
+ */
+ private List<Visitor> visitors = new ArrayList<Visitor>();
- /**
- * output writer to print things out
- */
- private static PrintWriter outputWriter = new PrintWriter(
- new OutputStreamWriter(System.out));
+ /**
+ * output writer to print things out
+ */
+ private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));
- /**
- * map a logical variable to type info
- */
- private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();
+ /**
+ * map a logical variable to type info
+ */
+ private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();
- @Override
- public LogicalVariable getVariable(String fieldName, TypeInfo type) {
- LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
- if (var == null) {
- currentVariable++;
- var = new LogicalVariable(currentVariable);
- fieldToLogicalVariableMap.put(fieldName, var);
- nameToLogicalVariableMap.put(var.toString(), var);
- variableToType.put(var, type);
- logicalVariableToFieldMap.put(var, fieldName);
- }
- return var;
- }
+ @Override
+ public LogicalVariable getVariable(String fieldName, TypeInfo type) {
+ LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
+ if (var == null) {
+ currentVariable++;
+ var = new LogicalVariable(currentVariable);
+ fieldToLogicalVariableMap.put(fieldName, var);
+ nameToLogicalVariableMap.put(var.toString(), var);
+ variableToType.put(var, type);
+ logicalVariableToFieldMap.put(var, fieldName);
+ }
+ return var;
+ }
- @Override
- public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {
- currentVariable++;
- LogicalVariable var = new LogicalVariable(currentVariable);
- fieldToLogicalVariableMap.put(fieldName, var);
- nameToLogicalVariableMap.put(var.toString(), var);
- variableToType.put(var, type);
- logicalVariableToFieldMap.put(var, fieldName);
- return var;
- }
+ @Override
+ public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {
+ currentVariable++;
+ LogicalVariable var = new LogicalVariable(currentVariable);
+ fieldToLogicalVariableMap.put(fieldName, var);
+ nameToLogicalVariableMap.put(var.toString(), var);
+ variableToType.put(var, type);
+ logicalVariableToFieldMap.put(var, fieldName);
+ return var;
+ }
- @Override
- public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {
- String name = this.logicalVariableToFieldMap.get(oldVar);
- if (name != null) {
- fieldToLogicalVariableMap.put(name, newVar);
- nameToLogicalVariableMap.put(newVar.toString(), newVar);
- nameToLogicalVariableMap.put(oldVar.toString(), newVar);
- logicalVariableToFieldMap.put(newVar, name);
- }
- }
+ @Override
+ public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {
+ String name = this.logicalVariableToFieldMap.get(oldVar);
+ if (name != null) {
+ fieldToLogicalVariableMap.put(name, newVar);
+ nameToLogicalVariableMap.put(newVar.toString(), newVar);
+ nameToLogicalVariableMap.put(oldVar.toString(), newVar);
+ logicalVariableToFieldMap.put(newVar, name);
+ }
+ }
- @Override
- public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {
- return metaData;
- }
+ @Override
+ public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {
+ return metaData;
+ }
- /**
- * only get an variable, without rewriting it
- *
- * @param fieldName
- * @return
- */
- private LogicalVariable getVariableOnly(String fieldName) {
- return fieldToLogicalVariableMap.get(fieldName);
- }
+ /**
+ * only get an variable, without rewriting it
+ *
+ * @param fieldName
+ * @return
+ */
+ private LogicalVariable getVariableOnly(String fieldName) {
+ return fieldToLogicalVariableMap.get(fieldName);
+ }
- private void updateVariable(String fieldName, LogicalVariable variable) {
- LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
- if (var == null) {
- fieldToLogicalVariableMap.put(fieldName, variable);
- nameToLogicalVariableMap.put(fieldName, variable);
- } else if (!var.equals(variable)) {
- // System.out.println("!!!replace variables!!!");
- fieldToLogicalVariableMap.put(fieldName, variable);
- nameToLogicalVariableMap.put(fieldName, variable);
- }
- }
+ private void updateVariable(String fieldName, LogicalVariable variable) {
+ LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
+ if (var == null) {
+ fieldToLogicalVariableMap.put(fieldName, variable);
+ nameToLogicalVariableMap.put(fieldName, variable);
+ } else if (!var.equals(variable)) {
+ // System.out.println("!!!replace variables!!!");
+ fieldToLogicalVariableMap.put(fieldName, variable);
+ nameToLogicalVariableMap.put(fieldName, variable);
+ }
+ }
- /**
- * get a list of logical variables from the schema
- *
- * @param schema
- * @return
- */
- @Override
- public List<LogicalVariable> getVariablesFromSchema(Schema schema) {
- List<LogicalVariable> variables = new ArrayList<LogicalVariable>();
- List<String> names = schema.getNames();
+ /**
+ * get a list of logical variables from the schema
+ *
+ * @param schema
+ * @return
+ */
+ @Override
+ public List<LogicalVariable> getVariablesFromSchema(Schema schema) {
+ List<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ List<String> names = schema.getNames();
- for (String name : names)
- variables.add(nameToLogicalVariableMap.get(name));
- return variables;
- }
+ for (String name : names)
+ variables.add(nameToLogicalVariableMap.get(name));
+ return variables;
+ }
- /**
- * get variable to typeinfo map
- *
- * @return
- */
- public HashMap<LogicalVariable, TypeInfo> getVariableContext() {
- return this.variableToType;
- }
+ /**
+ * get variable to typeinfo map
+ *
+ * @return
+ */
+ public HashMap<LogicalVariable, TypeInfo> getVariableContext() {
+ return this.variableToType;
+ }
- /**
- * get the number of variables
- * s
- * @return
- */
- public int getVariableCounter() {
- return currentVariable + 1;
- }
+ /**
+ * get the number of variables
+ * s
+ *
+ * @return
+ */
+ public int getVariableCounter() {
+ return currentVariable + 1;
+ }
- /**
- * translate from hive operator tree to asterix operator tree
- *
- * @param hive
- * roots
- * @return Algebricks roots
- */
- public void translate(List<Operator> hiveRoot,
- ILogicalOperator parentOperator,
- HashMap<String, PartitionDesc> aliasToPathMap)
- throws AlgebricksException {
- /**
- * register visitors
- */
- visitors.add(new FilterVisitor());
- visitors.add(new GroupByVisitor());
- visitors.add(new JoinVisitor());
- visitors.add(new LateralViewJoinVisitor());
- visitors.add(new UnionVisitor());
- visitors.add(new LimitVisitor());
- visitors.add(new MapJoinVisitor());
- visitors.add(new ProjectVisitor());
- visitors.add(new SortVisitor());
- visitors.add(new ExtractVisitor());
- visitors.add(new TableScanWriteVisitor(aliasToPathMap));
+ /**
+ * translate from hive operator tree to asterix operator tree
+ *
+ * @param hive
+ * roots
+ * @return Algebricks roots
+ */
+ public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,
+ HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {
+ /**
+ * register visitors
+ */
+ visitors.add(new FilterVisitor());
+ visitors.add(new GroupByVisitor());
+ visitors.add(new JoinVisitor());
+ visitors.add(new LateralViewJoinVisitor());
+ visitors.add(new UnionVisitor());
+ visitors.add(new LimitVisitor());
+ visitors.add(new MapJoinVisitor());
+ visitors.add(new ProjectVisitor());
+ visitors.add(new SortVisitor());
+ visitors.add(new ExtractVisitor());
+ visitors.add(new TableScanWriteVisitor(aliasToPathMap));
- List<Mutable<ILogicalOperator>> refList = translate(hiveRoot,
- new MutableObject<ILogicalOperator>(parentOperator));
- insertReplicateOperator(refList);
- if (refList != null)
- rootOperators.addAll(refList);
- }
+ List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(
+ parentOperator));
+ insertReplicateOperator(refList);
+ if (refList != null)
+ rootOperators.addAll(refList);
+ }
- /**
- * translate operator DAG
- *
- * @param hiveRoot
- * @param AlgebricksParentOperator
- * @return
- */
- private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,
- Mutable<ILogicalOperator> AlgebricksParentOperator)
- throws AlgebricksException {
+ /**
+ * translate operator DAG
+ *
+ * @param hiveRoot
+ * @param AlgebricksParentOperator
+ * @return
+ */
+ private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,
+ Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {
- for (Operator hiveOperator : hiveRoot) {
- continueTraverse = true;
- Mutable<ILogicalOperator> currentOperatorRef = null;
- if (hiveOperator.getType() == OperatorType.FILTER) {
- FilterOperator fop = (FilterOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {
- ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.JOIN) {
- JoinOperator fop = (JoinOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null) {
- continueTraverse = true;
- break;
- } else
- continueTraverse = false;
- }
- if (currentOperatorRef == null)
- return null;
- } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {
- LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- if (currentOperatorRef == null)
- return null;
- } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {
- MapJoinOperator fop = (MapJoinOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null) {
- continueTraverse = true;
- break;
- } else
- continueTraverse = false;
- }
- if (currentOperatorRef == null)
- return null;
- } else if (hiveOperator.getType() == OperatorType.SELECT) {
- SelectOperator fop = (SelectOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.EXTRACT) {
- ExtractOperator fop = (ExtractOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.GROUPBY) {
- GroupByOperator fop = (GroupByOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {
- TableScanOperator fop = (TableScanOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.FILESINK) {
- FileSinkOperator fop = (FileSinkOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(fop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.LIMIT) {
- LimitOperator lop = (LimitOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(lop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.UDTF) {
- UDTFOperator lop = (UDTFOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(lop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null)
- break;
- }
- } else if (hiveOperator.getType() == OperatorType.UNION) {
- UnionOperator lop = (UnionOperator) hiveOperator;
- for (Visitor visitor : visitors) {
- currentOperatorRef = visitor.visit(lop,
- AlgebricksParentOperator, this);
- if (currentOperatorRef != null) {
- continueTraverse = true;
- break;
- } else
- continueTraverse = false;
- }
- } else
- ;
- if (hiveOperator.getChildOperators() != null
- && hiveOperator.getChildOperators().size() > 0
- && continueTraverse) {
- @SuppressWarnings("unchecked")
- List<Operator> children = hiveOperator.getChildOperators();
- if (currentOperatorRef == null)
- currentOperatorRef = AlgebricksParentOperator;
- translate(children, currentOperatorRef);
- }
- if (hiveOperator.getChildOperators() == null
- || hiveOperator.getChildOperators().size() == 0)
- logicalOp.add(currentOperatorRef);
- }
- return logicalOp;
- }
+ for (Operator hiveOperator : hiveRoot) {
+ continueTraverse = true;
+ Mutable<ILogicalOperator> currentOperatorRef = null;
+ if (hiveOperator.getType() == OperatorType.FILTER) {
+ FilterOperator fop = (FilterOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {
+ ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.JOIN) {
+ JoinOperator fop = (JoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {
+ LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {
+ MapJoinOperator fop = (MapJoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.SELECT) {
+ SelectOperator fop = (SelectOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.EXTRACT) {
+ ExtractOperator fop = (ExtractOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.GROUPBY) {
+ GroupByOperator fop = (GroupByOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {
+ TableScanOperator fop = (TableScanOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.FILESINK) {
+ FileSinkOperator fop = (FileSinkOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.LIMIT) {
+ LimitOperator lop = (LimitOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.UDTF) {
+ UDTFOperator lop = (UDTFOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.UNION) {
+ UnionOperator lop = (UnionOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ } else
+ ;
+ if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0
+ && continueTraverse) {
+ @SuppressWarnings("unchecked")
+ List<Operator> children = hiveOperator.getChildOperators();
+ if (currentOperatorRef == null)
+ currentOperatorRef = AlgebricksParentOperator;
+ translate(children, currentOperatorRef);
+ }
+ if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)
+ logicalOp.add(currentOperatorRef);
+ }
+ return logicalOp;
+ }
- /**
- * used in select, group by to get no-column-expression columns
- *
- * @param cols
- * @return
- */
- public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent,
- List<ExprNodeDesc> cols, ArrayList<LogicalVariable> variables) {
+ /**
+ * used in select, group by to get no-column-expression columns
+ *
+ * @param cols
+ * @return
+ */
+ public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,
+ ArrayList<LogicalVariable> variables) {
- ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+ ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
- /**
- * variables to be appended in the assign operator
- */
- ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();
+ /**
+ * variables to be appended in the assign operator
+ */
+ ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();
- // one variable can only be assigned once
- for (ExprNodeDesc hiveExpr : cols) {
- rewriteExpression(hiveExpr);
+ // one variable can only be assigned once
+ for (ExprNodeDesc hiveExpr : cols) {
+ rewriteExpression(hiveExpr);
- if (hiveExpr instanceof ExprNodeColumnDesc) {
- ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;
- String fieldName = desc2.getTabAlias() + "."
- + desc2.getColumn();
+ if (hiveExpr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;
+ String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();
- // System.out.println("project expr: " + fieldName);
+ // System.out.println("project expr: " + fieldName);
- if (fieldName.indexOf("$$") < 0) {
- LogicalVariable var = getVariable(fieldName,
- hiveExpr.getTypeInfo());
- desc2.setColumn(var.toString());
- desc2.setTabAlias("");
- variables.add(var);
- } else {
- LogicalVariable var = nameToLogicalVariableMap.get(desc2
- .getColumn());
- String name = this.logicalVariableToFieldMap.get(var);
- var = this.getVariableOnly(name);
- variables.add(var);
- }
- } else {
- Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);
- expressions.add(asterixExpr);
- LogicalVariable var = getVariable(hiveExpr.getExprString()
- + asterixExpr.hashCode(), hiveExpr.getTypeInfo());
- variables.add(var);
- appendedVariables.add(var);
- }
- }
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());
+ desc2.setColumn(var.toString());
+ desc2.setTabAlias("");
+ variables.add(var);
+ } else {
+ LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());
+ String name = this.logicalVariableToFieldMap.get(var);
+ var = this.getVariableOnly(name);
+ variables.add(var);
+ }
+ } else {
+ Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);
+ expressions.add(asterixExpr);
+ LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),
+ hiveExpr.getTypeInfo());
+ variables.add(var);
+ appendedVariables.add(var);
+ }
+ }
- /**
- * create an assign operator to deal with appending
- */
- ILogicalOperator assignOp = null;
- if (appendedVariables.size() > 0) {
- assignOp = new AssignOperator(appendedVariables, expressions);
- assignOp.getInputs().add(parent);
- }
- return assignOp;
- }
+ /**
+ * create an assign operator to deal with appending
+ */
+ ILogicalOperator assignOp = null;
+ if (appendedVariables.size() > 0) {
+ assignOp = new AssignOperator(appendedVariables, expressions);
+ assignOp.getInputs().add(parent);
+ }
+ return assignOp;
+ }
- private ILogicalPlan plan;
+ private ILogicalPlan plan;
- public ILogicalPlan genLogicalPlan() {
- plan = new ALogicalPlanImpl(rootOperators);
- return plan;
- }
+ public ILogicalPlan genLogicalPlan() {
+ plan = new ALogicalPlanImpl(rootOperators);
+ return plan;
+ }
- public void printOperators() throws AlgebricksException {
- LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
- StringBuilder buffer = new StringBuilder();
- PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
- outputWriter.println(buffer);
- outputWriter.println("rewritten variables: ");
- outputWriter.flush();
- printVariables();
+ public void printOperators() throws AlgebricksException {
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+ outputWriter.println(buffer);
+ outputWriter.println("rewritten variables: ");
+ outputWriter.flush();
+ printVariables();
- }
+ }
- public static void setOutputPrinter(PrintWriter writer) {
- outputWriter = writer;
- }
+ public static void setOutputPrinter(PrintWriter writer) {
+ outputWriter = writer;
+ }
- private void printVariables() {
- Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap
- .entrySet();
+ private void printVariables() {
+ Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();
- for (Entry<String, LogicalVariable> entry : entries) {
- outputWriter.println(entry.getKey() + " -> " + entry.getValue());
- }
- outputWriter.flush();
- }
+ for (Entry<String, LogicalVariable> entry : entries) {
+ outputWriter.println(entry.getKey() + " -> " + entry.getValue());
+ }
+ outputWriter.flush();
+ }
- /**
- * generate the object inspector for the output of an operator
- *
- * @param operator
- * The Hive operator
- * @return an ObjectInspector object
- */
- public Schema generateInputSchema(Operator operator) {
- List<String> variableNames = new ArrayList<String>();
- List<TypeInfo> typeList = new ArrayList<TypeInfo>();
- List<ColumnInfo> columns = operator.getSchema().getSignature();
+ /**
+ * generate the object inspector for the output of an operator
+ *
+ * @param operator
+ * The Hive operator
+ * @return an ObjectInspector object
+ */
+ public Schema generateInputSchema(Operator operator) {
+ List<String> variableNames = new ArrayList<String>();
+ List<TypeInfo> typeList = new ArrayList<TypeInfo>();
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
- for (ColumnInfo col : columns) {
- // typeList.add();
- TypeInfo type = col.getType();
- typeList.add(type);
+ for (ColumnInfo col : columns) {
+ // typeList.add();
+ TypeInfo type = col.getType();
+ typeList.add(type);
- String fieldName = col.getInternalName();
- variableNames.add(fieldName);
- }
+ String fieldName = col.getInternalName();
+ variableNames.add(fieldName);
+ }
- return new Schema(variableNames, typeList);
- }
+ return new Schema(variableNames, typeList);
+ }
- /**
- * rewrite the names of output columns for feature expression evaluators to
- * use
- *
- * @param operator
- */
- public void rewriteOperatorOutputSchema(Operator operator) {
- List<ColumnInfo> columns = operator.getSchema().getSignature();
+ /**
+ * rewrite the names of output columns for feature expression evaluators to
+ * use
+ *
+ * @param operator
+ */
+ public void rewriteOperatorOutputSchema(Operator operator) {
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
- for (ColumnInfo column : columns) {
- String columnName = column.getTabAlias() + "."
- + column.getInternalName();
- if (columnName.indexOf("$$") < 0) {
- LogicalVariable var = getVariable(columnName, column.getType());
- column.setInternalName(var.toString());
- }
- }
- }
+ for (ColumnInfo column : columns) {
+ String columnName = column.getTabAlias() + "." + column.getInternalName();
+ if (columnName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariable(columnName, column.getType());
+ column.setInternalName(var.toString());
+ }
+ }
+ }
- @Override
- public void rewriteOperatorOutputSchema(List<LogicalVariable> variables,
- Operator operator) {
+ @Override
+ public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {
- printOperatorSchema(operator);
- List<ColumnInfo> columns = operator.getSchema().getSignature();
- if (variables.size() != columns.size()) {
- System.out.println("output cardinality error " + operator.getName()
- + " variable size: " + variables.size() + " expected "
- + columns.size());
- }
+ printOperatorSchema(operator);
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+ if (variables.size() != columns.size()) {
+ throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "
+ + variables.size() + " expected " + columns.size());
+ }
- for (int i = 0; i < variables.size(); i++) {
- LogicalVariable var = variables.get(i);
- ColumnInfo column = columns.get(i);
- String fieldName = column.getTabAlias() + "."
- + column.getInternalName();
- if (fieldName.indexOf("$$") < 0) {
- updateVariable(fieldName, var);
- column.setInternalName(var.toString());
- }
- }
- printOperatorSchema(operator);
- }
+ for (int i = 0; i < variables.size(); i++) {
+ LogicalVariable var = variables.get(i);
+ ColumnInfo column = columns.get(i);
+ String fieldName = column.getTabAlias() + "." + column.getInternalName();
+ if (fieldName.indexOf("$$") < 0) {
+ updateVariable(fieldName, var);
+ column.setInternalName(var.toString());
+ }
+ }
+ printOperatorSchema(operator);
+ }
- /**
- * rewrite an expression and substitute variables
- *
- * @param expr
- * hive expression
- */
- public void rewriteExpression(ExprNodeDesc expr) {
- if (expr instanceof ExprNodeColumnDesc) {
- ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
- String fieldName = desc.getTabAlias() + "." + desc.getColumn();
- if (fieldName.indexOf("$$") < 0) {
- LogicalVariable var = getVariableOnly(fieldName);
- if (var == null) {
- fieldName = "." + desc.getColumn();
- var = getVariableOnly(fieldName);
- if (var == null) {
- fieldName = "null." + desc.getColumn();
- var = getVariableOnly(fieldName);
- if (var == null) {
- System.out.println(fieldName + " is wrong!!! ");
- }
- }
- }
- String name = this.logicalVariableToFieldMap.get(var);
- var = getVariableOnly(name);
- desc.setColumn(var.toString());
- }
- } else {
- if (expr.getChildren() != null && expr.getChildren().size() > 0) {
- List<ExprNodeDesc> children = expr.getChildren();
- for (ExprNodeDesc desc : children)
- rewriteExpression(desc);
- }
- }
- }
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpression(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
+ String fieldName = desc.getTabAlias() + "." + desc.getColumn();
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariableOnly(fieldName);
+ if (var == null) {
+ fieldName = "." + desc.getColumn();
+ var = getVariableOnly(fieldName);
+ if (var == null) {
+ fieldName = "null." + desc.getColumn();
+ var = getVariableOnly(fieldName);
+ if (var == null) {
+ throw new IllegalStateException(fieldName + " is wrong!!! ");
+ }
+ }
+ }
+ String name = this.logicalVariableToFieldMap.get(var);
+ var = getVariableOnly(name);
+ desc.setColumn(var.toString());
+ }
+ } else {
+ if (expr.getChildren() != null && expr.getChildren().size() > 0) {
+ List<ExprNodeDesc> children = expr.getChildren();
+ for (ExprNodeDesc desc : children)
+ rewriteExpression(desc);
+ }
+ }
+ }
- /**
- * rewrite an expression and substitute variables
- *
- * @param expr
- * hive expression
- */
- public void rewriteExpressionPartial(ExprNodeDesc expr) {
- if (expr instanceof ExprNodeColumnDesc) {
- ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
- String fieldName = desc.getTabAlias() + "." + desc.getColumn();
- if (fieldName.indexOf("$$") < 0) {
- LogicalVariable var = getVariableOnly(fieldName);
- desc.setColumn(var.toString());
- }
- } else {
- if (expr.getChildren() != null && expr.getChildren().size() > 0) {
- List<ExprNodeDesc> children = expr.getChildren();
- for (ExprNodeDesc desc : children)
- rewriteExpressionPartial(desc);
- }
- }
- }
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpressionPartial(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
+ String fieldName = desc.getTabAlias() + "." + desc.getColumn();
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariableOnly(fieldName);
+ desc.setColumn(var.toString());
+ }
+ } else {
+ if (expr.getChildren() != null && expr.getChildren().size() > 0) {
+ List<ExprNodeDesc> children = expr.getChildren();
+ for (ExprNodeDesc desc : children)
+ rewriteExpressionPartial(desc);
+ }
+ }
+ }
- private void printOperatorSchema(Operator operator) {
- System.out.println(operator.getName());
- List<ColumnInfo> columns = operator.getSchema().getSignature();
- for (ColumnInfo column : columns) {
- System.out.print(column.getTabAlias() + "."
- + column.getInternalName() + " ");
- }
- System.out.println();
- }
+ private void printOperatorSchema(Operator operator) {
+ System.out.println(operator.getName());
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+ for (ColumnInfo column : columns) {
+ System.out.print(column.getTabAlias() + "." + column.getInternalName() + " ");
+ }
+ System.out.println();
+ }
- /**
- * translate scalar function expression
- *
- * @param hiveExpr
- * @return
- */
- public Mutable<ILogicalExpression> translateScalarFucntion(
- ExprNodeDesc hiveExpr) {
- ILogicalExpression AlgebricksExpr;
+ /**
+ * translate scalar function expression
+ *
+ * @param hiveExpr
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {
+ ILogicalExpression AlgebricksExpr;
- if (hiveExpr instanceof ExprNodeGenericFuncDesc) {
- List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
- List<ExprNodeDesc> children = hiveExpr.getChildren();
+ if (hiveExpr instanceof ExprNodeGenericFuncDesc) {
+ List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
+ List<ExprNodeDesc> children = hiveExpr.getChildren();
- for (ExprNodeDesc child : children)
- arguments.add(translateScalarFucntion(child));
+ for (ExprNodeDesc child : children)
+ arguments.add(translateScalarFucntion(child));
- ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;
- GenericUDF genericUdf = funcExpr.getGenericUDF();
- UDF udf = null;
- if (genericUdf instanceof GenericUDFBridge) {
- GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;
- try {
- udf = bridge.getUdfClass().newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;
+ GenericUDF genericUdf = funcExpr.getGenericUDF();
+ UDF udf = null;
+ if (genericUdf instanceof GenericUDFBridge) {
+ GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;
+ try {
+ udf = bridge.getUdfClass().newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
- /**
- * set up the hive function
- */
- Object hiveFunction = genericUdf;
- if (udf != null)
- hiveFunction = udf;
+ /**
+ * set up the hive function
+ */
+ Object hiveFunction = genericUdf;
+ if (udf != null)
+ hiveFunction = udf;
- FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE
- .getAlgebricksFunctionId(hiveFunction.getClass());
- if (funcId == null) {
- funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,
- hiveFunction.getClass().getName());
- }
+ FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction
+ .getClass());
+ if (funcId == null) {
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());
+ }
- Object functionInfo = null;
- if (genericUdf instanceof GenericUDFBridge) {
- functionInfo = funcExpr;
- }
+ Object functionInfo = null;
+ if (genericUdf instanceof GenericUDFBridge) {
+ functionInfo = funcExpr;
+ }
- /**
- * generate the function call expression
- */
- ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(
- new HiveFunctionInfo(funcId, functionInfo), arguments);
- AlgebricksExpr = AlgebricksFuncExpr;
+ /**
+ * generate the function call expression
+ */
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, functionInfo), arguments);
+ AlgebricksExpr = AlgebricksFuncExpr;
- } else if (hiveExpr instanceof ExprNodeColumnDesc) {
- ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;
- LogicalVariable var = this.getVariable(column.getColumn());
- AlgebricksExpr = new VariableReferenceExpression(var);
+ } else if (hiveExpr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;
+ LogicalVariable var = this.getVariable(column.getColumn());
+ AlgebricksExpr = new VariableReferenceExpression(var);
- } else if (hiveExpr instanceof ExprNodeFieldDesc) {
- FunctionIdentifier funcId;
- funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,
- ExpressionConstant.FIELDACCESS);
+ } else if (hiveExpr instanceof ExprNodeFieldDesc) {
+ FunctionIdentifier funcId;
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);
- ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(
- new HiveFunctionInfo(funcId, hiveExpr));
- AlgebricksExpr = AlgebricksFuncExpr;
- } else if (hiveExpr instanceof ExprNodeConstantDesc) {
- ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;
- Object value = hiveConst.getValue();
- AlgebricksExpr = new ConstantExpression(
- new HivesterixConstantValue(value));
- } else if (hiveExpr instanceof ExprNodeNullDesc) {
- FunctionIdentifier funcId;
- funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE,
- ExpressionConstant.NULL);
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, hiveExpr));
+ AlgebricksExpr = AlgebricksFuncExpr;
+ } else if (hiveExpr instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;
+ Object value = hiveConst.getValue();
+ AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));
+ } else if (hiveExpr instanceof ExprNodeNullDesc) {
+ FunctionIdentifier funcId;
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);
- ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(
- new HiveFunctionInfo(funcId, hiveExpr));
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, hiveExpr));
- AlgebricksExpr = AlgebricksFuncExpr;
- } else {
- throw new IllegalStateException("unknown hive expression");
- }
- return new MutableObject<ILogicalExpression>(AlgebricksExpr);
- }
+ AlgebricksExpr = AlgebricksFuncExpr;
+ } else {
+ throw new IllegalStateException("unknown hive expression");
+ }
+ return new MutableObject<ILogicalExpression>(AlgebricksExpr);
+ }
- /**
- * translate aggregation function expression
- *
- * @param aggregateDesc
- * @return
- */
- public Mutable<ILogicalExpression> translateAggregation(
- AggregationDesc aggregateDesc) {
+ /**
+ * translate aggregation function expression
+ *
+ * @param aggregateDesc
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {
- String UDAFName = aggregateDesc.getGenericUDAFName();
+ String UDAFName = aggregateDesc.getGenericUDAFName();
- List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
- List<ExprNodeDesc> children = aggregateDesc.getParameters();
+ List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
+ List<ExprNodeDesc> children = aggregateDesc.getParameters();
- for (ExprNodeDesc child : children)
- arguments.add(translateScalarFucntion(child));
+ for (ExprNodeDesc child : children)
+ arguments.add(translateScalarFucntion(child));
- FunctionIdentifier funcId = new FunctionIdentifier(
- ExpressionConstant.NAMESPACE, UDAFName + "("
- + aggregateDesc.getMode() + ")");
- HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);
- AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(
- funcInfo, false, arguments);
- return new MutableObject<ILogicalExpression>(aggregationExpression);
- }
+ FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("
+ + aggregateDesc.getMode() + ")");
+ HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);
+ AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,
+ arguments);
+ return new MutableObject<ILogicalExpression>(aggregationExpression);
+ }
- /**
- * translate aggregation function expression
- *
- * @param aggregator
- * @return
- */
- public Mutable<ILogicalExpression> translateUnnestFunction(
- UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {
+ /**
+ * translate aggregation function expression
+ *
+ * @param aggregator
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {
- String UDTFName = udtfDesc.getUDTFName();
+ String UDTFName = udtfDesc.getUDTFName();
- FunctionIdentifier funcId = new FunctionIdentifier(
- ExpressionConstant.NAMESPACE, UDTFName);
- UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(
- new HiveFunctionInfo(funcId, udtfDesc));
- unnestingExpression.getArguments().add(argument);
- return new MutableObject<ILogicalExpression>(unnestingExpression);
- }
+ FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);
+ UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(
+ funcId, udtfDesc));
+ unnestingExpression.getArguments().add(argument);
+ return new MutableObject<ILogicalExpression>(unnestingExpression);
+ }
- /**
- * get typeinfo
- */
- @Override
- public TypeInfo getType(LogicalVariable var) {
- return variableToType.get(var);
- }
+ /**
+ * get typeinfo
+ */
+ @Override
+ public TypeInfo getType(LogicalVariable var) {
+ return variableToType.get(var);
+ }
- /**
- * get variable from variable name
- */
- @Override
- public LogicalVariable getVariable(String name) {
- return nameToLogicalVariableMap.get(name);
- }
+ /**
+ * get variable from variable name
+ */
+ @Override
+ public LogicalVariable getVariable(String name) {
+ return nameToLogicalVariableMap.get(name);
+ }
- @Override
- public LogicalVariable getVariableFromFieldName(String fieldName) {
- return this.getVariableOnly(fieldName);
- }
+ @Override
+ public LogicalVariable getVariableFromFieldName(String fieldName) {
+ return this.getVariableOnly(fieldName);
+ }
- /**
- * set the metadata provider
- */
- @Override
- public void setMetadataProvider(
- IMetadataProvider<PartitionDesc, Object> metadata) {
- this.metaData = metadata;
- }
+ /**
+ * set the metadata provider
+ */
+ @Override
+ public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {
+ this.metaData = metadata;
+ }
- /**
- * insert ReplicateOperator when necessary
- */
- private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {
- Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
- buildChildToParentsMapping(roots, childToParentsMap);
- for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap
- .entrySet()) {
- List<Mutable<ILogicalOperator>> pList = entry.getValue();
- if (pList.size() > 1) {
- ILogicalOperator rop = new ReplicateOperator(pList.size());
- Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(
- rop);
- Mutable<ILogicalOperator> childRef = entry.getKey();
- rop.getInputs().add(childRef);
- for (Mutable<ILogicalOperator> parentRef : pList) {
- ILogicalOperator parentOp = parentRef.getValue();
- int index = parentOp.getInputs().indexOf(childRef);
- parentOp.getInputs().set(index, ropRef);
- }
- }
- }
- }
+ /**
+ * insert ReplicateOperator when necessary
+ */
+ private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {
+ Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+ buildChildToParentsMapping(roots, childToParentsMap);
+ for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {
+ List<Mutable<ILogicalOperator>> pList = entry.getValue();
+ if (pList.size() > 1) {
+ ILogicalOperator rop = new ReplicateOperator(pList.size());
+ Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
+ Mutable<ILogicalOperator> childRef = entry.getKey();
+ rop.getInputs().add(childRef);
+ for (Mutable<ILogicalOperator> parentRef : pList) {
+ ILogicalOperator parentOp = parentRef.getValue();
+ int index = parentOp.getInputs().indexOf(childRef);
+ parentOp.getInputs().set(index, ropRef);
+ }
+ }
+ }
+ }
- /**
- * build the mapping from child to Parents
- *
- * @param roots
- * @param childToParentsMap
- */
- private void buildChildToParentsMapping(
- List<Mutable<ILogicalOperator>> roots,
- Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {
- for (Mutable<ILogicalOperator> opRef : roots) {
- List<Mutable<ILogicalOperator>> childRefs = opRef.getValue()
- .getInputs();
- for (Mutable<ILogicalOperator> childRef : childRefs) {
- List<Mutable<ILogicalOperator>> parentList = map.get(childRef);
- if (parentList == null) {
- parentList = new ArrayList<Mutable<ILogicalOperator>>();
- map.put(childRef, parentList);
- }
- if (!parentList.contains(opRef))
- parentList.add(opRef);
- }
- buildChildToParentsMapping(childRefs, map);
- }
- }
+ /**
+ * build the mapping from child to Parents
+ *
+ * @param roots
+ * @param childToParentsMap
+ */
+ private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,
+ Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {
+ for (Mutable<ILogicalOperator> opRef : roots) {
+ List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();
+ for (Mutable<ILogicalOperator> childRef : childRefs) {
+ List<Mutable<ILogicalOperator>> parentList = map.get(childRef);
+ if (parentList == null) {
+ parentList = new ArrayList<Mutable<ILogicalOperator>>();
+ map.put(childRef, parentList);
+ }
+ if (!parentList.contains(opRef))
+ parentList.add(opRef);
+ }
+ buildChildToParentsMapping(childRefs, map);
+ }
+ }
}
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 2d8829c..6c1ac72 100644
--- a/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
@@ -71,527 +72,483 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class HyracksExecutionEngine implements IExecutionEngine {
- private static final Log LOG = LogFactory
- .getLog(HyracksExecutionEngine.class.getName());
+ private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
- // private static final String[] locConstraints = {}
+ // private static final String[] locConstraints = {}
- private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
- private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
- static {
- SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(
- false);
- SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(
- true);
- SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(
- true);
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqCtrlFullDfs, HiveRuleCollections.NORMALIZATION));
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqCtrlNoDfs,
- HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqCtrlFullDfs, HiveRuleCollections.LOAD_FIELDS));
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqCtrlNoDfs, HiveRuleCollections.OP_PUSHDOWN));
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqOnceCtrl, HiveRuleCollections.DATA_EXCHANGE));
- DEFAULT_LOGICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqCtrlNoDfs, HiveRuleCollections.CONSOLIDATION));
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ static {
+ SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
+ SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ HiveRuleCollections.NORMALIZATION));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ HiveRuleCollections.LOAD_FIELDS));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.OP_PUSHDOWN));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.DATA_EXCHANGE));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.CONSOLIDATION));
- DEFAULT_PHYSICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqOnceCtrl, HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
- DEFAULT_PHYSICAL_REWRITES
- .add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(
- seqOnceCtrl, HiveRuleCollections.prepareJobGenRules));
- }
+ DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
+ DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.prepareJobGenRules));
+ }
- /**
- * static configurations for compiler
- */
- private HeuristicCompilerFactoryBuilder builder;
+ /**
+ * static configurations for compiler
+ */
+ private HeuristicCompilerFactoryBuilder builder;
- /**
- * compiler
- */
- private ICompiler compiler;
+ /**
+ * compiler
+ */
+ private ICompiler compiler;
- /**
- * physical optimization config
- */
- private PhysicalOptimizationConfig physicalOptimizationConfig;
+ /**
+ * physical optimization config
+ */
+ private PhysicalOptimizationConfig physicalOptimizationConfig;
- /**
- * final ending operators
- */
- private List<Operator> leaveOps = new ArrayList<Operator>();
+ /**
+ * final ending operators
+ */
+ private List<Operator> leaveOps = new ArrayList<Operator>();
- /**
- * tasks that are already visited
- */
- private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
+ /**
+ * tasks that are already visited
+ */
+ private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
- /**
- * hyracks job spec
- */
- private JobSpecification jobSpec;
+ /**
+ * hyracks job spec
+ */
+ private JobSpecification jobSpec;
- /**
- * hive configuration
- */
- private HiveConf conf;
+ /**
+ * hive configuration
+ */
+ private HiveConf conf;
- /**
- * plan printer
- */
- private PrintWriter planPrinter;
+ /**
+ * plan printer
+ */
+ private PrintWriter planPrinter;
- public HyracksExecutionEngine(HiveConf conf) {
- this.conf = conf;
- init(conf);
- }
+ public HyracksExecutionEngine(HiveConf conf) {
+ this.conf = conf;
+ init(conf);
+ }
- public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
- this.conf = conf;
- this.planPrinter = planPrinter;
- init(conf);
- }
+ public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
+ this.conf = conf;
+ this.planPrinter = planPrinter;
+ init(conf);
+ }
- private void init(HiveConf conf) {
- builder = new HeuristicCompilerFactoryBuilder(
- DefaultOptimizationContextFactory.INSTANCE);
- builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
- builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
- builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
- builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
- builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
+ private void init(HiveConf conf) {
+ builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
+ builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
+ builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
+ builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
+ builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
+ builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
- long memSizeExternalGby = conf.getLong(
- "hive.algebricks.groupby.external.memory", 268435456);
- long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory",
- 536870912);
- int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
+ long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
+ long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
+ int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
- physicalOptimizationConfig = new PhysicalOptimizationConfig();
- int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
- physicalOptimizationConfig
- .setMaxFramesExternalGroupBy(frameLimitExtGby);
- int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
- physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
- builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
- }
+ physicalOptimizationConfig = new PhysicalOptimizationConfig();
+ int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
+ physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
+ int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
+ physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
+ builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
+ }
- @Override
- public int compileJob(List<Task<? extends Serializable>> rootTasks) {
- // clean up
- leaveOps.clear();
- tasksVisited.clear();
- jobSpec = null;
+ @Override
+ public int compileJob(List<Task<? extends Serializable>> rootTasks) {
+ // clean up
+ leaveOps.clear();
+ tasksVisited.clear();
+ jobSpec = null;
- HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
- List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
+ HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
+ List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
- // get all leave Ops
- getLeaves(rootOps, leaveOps);
+ // get all leave Ops
+ getLeaves(rootOps, leaveOps);
- HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
- try {
- translator.translate(rootOps, null, aliasToPath);
+ HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
+ try {
+ translator.translate(rootOps, null, aliasToPath);
- ILogicalPlan plan = translator.genLogicalPlan();
+ ILogicalPlan plan = translator.genLogicalPlan();
- if (plan.getRoots() != null && plan.getRoots().size() > 0
- && plan.getRoots().get(0).getValue() != null) {
- translator.printOperators();
- System.out.println("translate complete");
- ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(
- plan, translator.getMetadataProvider());
+ if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
+ translator.printOperators();
+ ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
+ translator.getMetadataProvider());
- ICompilerFactory compilerFactory = builder.create();
- compiler = compilerFactory.createCompiler(
- planAndMetadata.getPlan(),
- planAndMetadata.getMetadataProvider(),
- translator.getVariableCounter());
+ ICompilerFactory compilerFactory = builder.create();
+ compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
+ planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
- // run optimization and re-writing rules for Hive plan
- compiler.optimize();
+ // run optimization and re-writing rules for Hive plan
+ compiler.optimize();
- // print optimized plan
- LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
- StringBuilder buffer = new StringBuilder();
- PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
- String planStr = buffer.toString();
- System.out.println(planStr);
+ // print optimized plan
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+ String planStr = buffer.toString();
+ System.out.println(planStr);
- if (planPrinter != null)
- planPrinter.print(planStr);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return 1;
- }
+ if (planPrinter != null)
+ planPrinter.print(planStr);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return 1;
+ }
- return 0;
- }
+ return 0;
+ }
- private void codeGen() throws AlgebricksException {
- // number of cpu cores in the cluster
- builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(
- ConfUtil.getNCs()));
- // builder.setClusterTopology(ConfUtil.getClusterTopology());
- builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
- builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
- builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
- builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
- builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
- builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
- builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
- builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
- builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
- builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
- builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
+ private void codeGen() throws AlgebricksException {
+ // number of cpu cores in the cluster
+ builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
+ // builder.setClusterTopology(ConfUtil.getClusterTopology());
+ builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
+ builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
+ builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
+ builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
+ builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
+ builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
+ builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
+ builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
+ builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
+ builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
+ builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
+ builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
- jobSpec = compiler.createJob(null);
+ jobSpec = compiler.createJob(null);
- // set the policy
- String policyStr = conf.get("hive.hyracks.connectorpolicy");
- if (policyStr == null)
- policyStr = "PIPELINING";
- Policy policyValue = Policy.valueOf(policyStr);
- jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(
- policyValue));
+ // set the policy
+ String policyStr = conf.get("hive.hyracks.connectorpolicy");
+ if (policyStr == null)
+ policyStr = "PIPELINING";
+ Policy policyValue = Policy.valueOf(policyStr);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
+ jobSpec.setUseConnectorPolicyForScheduling(false);
+ }
- // execute the job
- System.out.println(jobSpec.toString());
- }
+ @Override
+ public int executeJob() {
+ try {
+ codeGen();
+ executeHyraxJob(jobSpec);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return 1;
+ }
+ return 0;
+ }
- @Override
- public int executeJob() {
- try {
- codeGen();
- executeHyraxJob(jobSpec);
- } catch (Exception e) {
- e.printStackTrace();
- return 1;
- }
- return 0;
- }
+ private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
+ HashMap<String, PartitionDesc> aliasToPath) {
- private List<Operator> generateRootOperatorDAG(
- List<Task<? extends Serializable>> rootTasks,
- HashMap<String, PartitionDesc> aliasToPath) {
+ List<Operator> rootOps = new ArrayList<Operator>();
+ List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
+ tasksVisited.clear();
- List<Operator> rootOps = new ArrayList<Operator>();
- List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
- tasksVisited.clear();
+ for (int i = rootTasks.size() - 1; i >= 0; i--) {
+ /**
+ * list of map-reduce tasks
+ */
+ Task<? extends Serializable> task = rootTasks.get(i);
+ // System.out.println("!" + task.getName());
- for (int i = rootTasks.size() - 1; i >= 0; i--) {
- /**
- * list of map-reduce tasks
- */
- Task<? extends Serializable> task = rootTasks.get(i);
- // System.out.println("!" + task.getName());
+ if (task instanceof MapRedTask) {
+ List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
+ if (i == 0)
+ rootOps.addAll(mapRootOps);
+ else {
+ List<Operator> leaves = new ArrayList<Operator>();
+ getLeaves(rootOps, leaves);
- if (task instanceof MapRedTask) {
- List<Operator> mapRootOps = articulateMapReduceOperators(task,
- rootOps, aliasToPath, rootTasks);
- if (i == 0)
- rootOps.addAll(mapRootOps);
- else {
- List<Operator> leaves = new ArrayList<Operator>();
- getLeaves(rootOps, leaves);
+ List<Operator> mapChildren = new ArrayList<Operator>();
+ for (Operator childMap : mapRootOps) {
+ if (childMap instanceof TableScanOperator) {
+ TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+ if (topDesc == null)
+ mapChildren.add(childMap);
+ else {
+ rootOps.add(childMap);
+ }
+ } else
+ mapChildren.add(childMap);
+ }
- List<Operator> mapChildren = new ArrayList<Operator>();
- for (Operator childMap : mapRootOps) {
- if (childMap instanceof TableScanOperator) {
- TableScanDesc topDesc = (TableScanDesc) childMap
- .getConf();
- if (topDesc == null)
- mapChildren.add(childMap);
- else {
- rootOps.add(childMap);
- }
- } else
- mapChildren.add(childMap);
- }
+ if (mapChildren.size() > 0) {
+ for (Operator leaf : leaves)
+ leaf.setChildOperators(mapChildren);
+ for (Operator child : mapChildren)
+ child.setParentOperators(leaves);
+ }
+ }
- if (mapChildren.size() > 0) {
- for (Operator leaf : leaves)
- leaf.setChildOperators(mapChildren);
- for (Operator child : mapChildren)
- child.setParentOperators(leaves);
- }
- }
+ MapredWork mr = (MapredWork) task.getWork();
+ HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
- MapredWork mr = (MapredWork) task.getWork();
- HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+ addAliasToPartition(aliasToPath, map);
+ toDelete.add(task);
+ }
+ }
- addAliasToPartition(aliasToPath, map);
- toDelete.add(task);
- }
- }
+ for (Task<? extends Serializable> task : toDelete)
+ rootTasks.remove(task);
- for (Task<? extends Serializable> task : toDelete)
- rootTasks.remove(task);
-
- return rootOps;
- }
+ return rootOps;
+ }
- private void addAliasToPartition(
- HashMap<String, PartitionDesc> aliasToPath,
- HashMap<String, PartitionDesc> map) {
- Iterator<String> keys = map.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- PartitionDesc part = map.get(key);
- String[] names = key.split(":");
- for (String name : names) {
- aliasToPath.put(name, part);
- }
- }
- }
+ private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
+ Iterator<String> keys = map.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ PartitionDesc part = map.get(key);
+ String[] names = key.split(":");
+ for (String name : names) {
+ aliasToPath.put(name, part);
+ }
+ }
+ }
- private List<Operator> articulateMapReduceOperators(Task task,
- List<Operator> rootOps, HashMap<String, PartitionDesc> aliasToPath,
- List<Task<? extends Serializable>> rootTasks) {
- // System.out.println("!"+task.getName());
- if (!(task instanceof MapRedTask)) {
- if (!(task instanceof ConditionalTask)) {
- rootTasks.add(task);
- return null;
- } else {
- // remove map-reduce branches in condition task
- ConditionalTask condition = (ConditionalTask) task;
- List<Task<? extends Serializable>> branches = condition
- .getListTasks();
- for (int i = branches.size() - 1; i >= 0; i--) {
- Task branch = branches.get(i);
- if (branch instanceof MapRedTask) {
- return articulateMapReduceOperators(branch, rootOps,
- aliasToPath, rootTasks);
- }
- }
- rootTasks.add(task);
- return null;
- }
- }
+ private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
+ HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
+ // System.out.println("!"+task.getName());
+ if (!(task instanceof MapRedTask)) {
+ if (!(task instanceof ConditionalTask)) {
+ rootTasks.add(task);
+ return null;
+ } else {
+ // remove map-reduce branches in condition task
+ ConditionalTask condition = (ConditionalTask) task;
+ List<Task<? extends Serializable>> branches = condition.getListTasks();
+ for (int i = branches.size() - 1; i >= 0; i--) {
+ Task branch = branches.get(i);
+ if (branch instanceof MapRedTask) {
+ return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
+ }
+ }
+ rootTasks.add(task);
+ return null;
+ }
+ }
- MapredWork mr = (MapredWork) task.getWork();
- HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+ MapredWork mr = (MapredWork) task.getWork();
+ HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
- // put all aliasToParitionDesc mapping into the map
- addAliasToPartition(aliasToPath, map);
+ // put all aliasToParitionDesc mapping into the map
+ addAliasToPartition(aliasToPath, map);
- MapRedTask mrtask = (MapRedTask) task;
- MapredWork work = (MapredWork) mrtask.getWork();
- HashMap<String, Operator<? extends Serializable>> operators = work
- .getAliasToWork();
+ MapRedTask mrtask = (MapRedTask) task;
+ MapredWork work = (MapredWork) mrtask.getWork();
+ HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
- Set entries = operators.entrySet();
- Iterator<Entry<String, Operator>> iterator = entries.iterator();
- List<Operator> mapRootOps = new ArrayList<Operator>();
+ Set entries = operators.entrySet();
+ Iterator<Entry<String, Operator>> iterator = entries.iterator();
+ List<Operator> mapRootOps = new ArrayList<Operator>();
- // get map root operators
- while (iterator.hasNext()) {
- Operator next = iterator.next().getValue();
- if (!mapRootOps.contains(next)) {
- // clear that only for the case of union
- mapRootOps.add(next);
- }
- }
+ // get map root operators
+ while (iterator.hasNext()) {
+ Operator next = iterator.next().getValue();
+ if (!mapRootOps.contains(next)) {
+ // clear that only for the case of union
+ mapRootOps.add(next);
+ }
+ }
- // get map local work
- MapredLocalWork localWork = work.getMapLocalWork();
- if (localWork != null) {
- HashMap<String, Operator<? extends Serializable>> localOperators = localWork
- .getAliasToWork();
+ // get map local work
+ MapredLocalWork localWork = work.getMapLocalWork();
+ if (localWork != null) {
+ HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
- Set localEntries = localOperators.entrySet();
- Iterator<Entry<String, Operator>> localIterator = localEntries
- .iterator();
- while (localIterator.hasNext()) {
- mapRootOps.add(localIterator.next().getValue());
- }
+ Set localEntries = localOperators.entrySet();
+ Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
+ while (localIterator.hasNext()) {
+ mapRootOps.add(localIterator.next().getValue());
+ }
- HashMap<String, FetchWork> localFetch = localWork
- .getAliasToFetchWork();
- Set localFetchEntries = localFetch.entrySet();
- Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries
- .iterator();
- while (localFetchIterator.hasNext()) {
- Entry<String, FetchWork> fetchMap = localFetchIterator.next();
- FetchWork fetch = fetchMap.getValue();
- String alias = fetchMap.getKey();
- List<PartitionDesc> dirPart = fetch.getPartDesc();
+ HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
+ Set localFetchEntries = localFetch.entrySet();
+ Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
+ while (localFetchIterator.hasNext()) {
+ Entry<String, FetchWork> fetchMap = localFetchIterator.next();
+ FetchWork fetch = fetchMap.getValue();
+ String alias = fetchMap.getKey();
+ List<PartitionDesc> dirPart = fetch.getPartDesc();
- // temporary hack: put the first partitionDesc into the map
- aliasToPath.put(alias, dirPart.get(0));
- }
- }
+ // temporary hack: put the first partitionDesc into the map
+ aliasToPath.put(alias, dirPart.get(0));
+ }
+ }
- Boolean visited = tasksVisited.get(task);
- if (visited != null && visited.booleanValue() == true) {
- return mapRootOps;
- }
+ Boolean visited = tasksVisited.get(task);
+ if (visited != null && visited.booleanValue() == true) {
+ return mapRootOps;
+ }
- // do that only for union operator
- for (Operator op : mapRootOps)
- if (op.getParentOperators() != null)
- op.getParentOperators().clear();
+ // do that only for union operator
+ for (Operator op : mapRootOps)
+ if (op.getParentOperators() != null)
+ op.getParentOperators().clear();
- List<Operator> mapLeaves = new ArrayList<Operator>();
- downToLeaves(mapRootOps, mapLeaves);
- List<Operator> reduceOps = new ArrayList<Operator>();
+ List<Operator> mapLeaves = new ArrayList<Operator>();
+ downToLeaves(mapRootOps, mapLeaves);
+ List<Operator> reduceOps = new ArrayList<Operator>();
- if (work.getReducer() != null)
- reduceOps.add(work.getReducer());
+ if (work.getReducer() != null)
+ reduceOps.add(work.getReducer());
- for (Operator mapLeaf : mapLeaves) {
- mapLeaf.setChildOperators(reduceOps);
- }
+ for (Operator mapLeaf : mapLeaves) {
+ mapLeaf.setChildOperators(reduceOps);
+ }
- for (Operator reduceOp : reduceOps) {
- if (reduceOp != null)
- reduceOp.setParentOperators(mapLeaves);
- }
+ for (Operator reduceOp : reduceOps) {
+ if (reduceOp != null)
+ reduceOp.setParentOperators(mapLeaves);
+ }
- List<Operator> leafs = new ArrayList<Operator>();
- if (reduceOps.size() > 0) {
- downToLeaves(reduceOps, leafs);
- } else {
- leafs = mapLeaves;
- }
+ List<Operator> leafs = new ArrayList<Operator>();
+ if (reduceOps.size() > 0) {
+ downToLeaves(reduceOps, leafs);
+ } else {
+ leafs = mapLeaves;
+ }
- List<Operator> mapChildren = new ArrayList<Operator>();
- if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
- System.out.println("have child tasks!!");
- for (Object child : task.getChildTasks()) {
- System.out.println(child.getClass().getName());
- List<Operator> childMapOps = articulateMapReduceOperators(
- (Task) child, rootOps, aliasToPath, rootTasks);
- if (childMapOps == null)
- continue;
+ List<Operator> mapChildren = new ArrayList<Operator>();
+ if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
+ for (Object child : task.getChildTasks()) {
+ List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
+ if (childMapOps == null)
+ continue;
- for (Operator childMap : childMapOps) {
- if (childMap instanceof TableScanOperator) {
- TableScanDesc topDesc = (TableScanDesc) childMap
- .getConf();
- if (topDesc == null)
- mapChildren.add(childMap);
- else {
- rootOps.add(childMap);
- }
- } else {
- // if not table scan, add the child
- mapChildren.add(childMap);
- }
- }
- }
+ for (Operator childMap : childMapOps) {
+ if (childMap instanceof TableScanOperator) {
+ TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+ if (topDesc == null)
+ mapChildren.add(childMap);
+ else {
+ rootOps.add(childMap);
+ }
+ } else {
+ // if not table scan, add the child
+ mapChildren.add(childMap);
+ }
+ }
+ }
- if (mapChildren.size() > 0) {
- int i = 0;
- for (Operator leaf : leafs) {
- if (leaf.getChildOperators() == null
- || leaf.getChildOperators().size() == 0)
- leaf.setChildOperators(new ArrayList<Operator>());
- leaf.getChildOperators().add(mapChildren.get(i));
- i++;
- }
- i = 0;
- for (Operator child : mapChildren) {
- if (child.getParentOperators() == null
- || child.getParentOperators().size() == 0)
- child.setParentOperators(new ArrayList<Operator>());
- child.getParentOperators().add(leafs.get(i));
- i++;
- }
- }
- }
+ if (mapChildren.size() > 0) {
+ int i = 0;
+ for (Operator leaf : leafs) {
+ if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
+ leaf.setChildOperators(new ArrayList<Operator>());
+ leaf.getChildOperators().add(mapChildren.get(i));
+ i++;
+ }
+ i = 0;
+ for (Operator child : mapChildren) {
+ if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
+ child.setParentOperators(new ArrayList<Operator>());
+ child.getParentOperators().add(leafs.get(i));
+ i++;
+ }
+ }
+ }
- // mark this task as visited
- this.tasksVisited.put(task, true);
- return mapRootOps;
- }
+ // mark this task as visited
+ this.tasksVisited.put(task, true);
+ return mapRootOps;
+ }
- /**
- * down to leaf nodes
- *
- * @param ops
- * @param leaves
- */
- private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
+ /**
+ * down to leaf nodes
+ *
+ * @param ops
+ * @param leaves
+ */
+ private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
- // Operator currentOp;
- for (Operator op : ops) {
- if (op != null && op.getChildOperators() != null
- && op.getChildOperators().size() > 0) {
- downToLeaves(op.getChildOperators(), leaves);
- } else {
- if (op != null && leaves.indexOf(op) < 0)
- leaves.add(op);
- }
- }
- }
+ // Operator currentOp;
+ for (Operator op : ops) {
+ if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
+ downToLeaves(op.getChildOperators(), leaves);
+ } else {
+ if (op != null && leaves.indexOf(op) < 0)
+ leaves.add(op);
+ }
+ }
+ }
- private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
- for (Operator op : roots) {
- List<Operator> children = op.getChildOperators();
- if (children == null || children.size() <= 0) {
- currentLeaves.add(op);
- } else {
- getLeaves(children, currentLeaves);
- }
- }
- }
+ private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
+ for (Operator op : roots) {
+ List<Operator> children = op.getChildOperators();
+ if (children == null || children.size() <= 0) {
+ currentLeaves.add(op);
+ } else {
+ getLeaves(children, currentLeaves);
+ }
+ }
+ }
- private void executeHyraxJob(JobSpecification job) throws Exception {
- String ipAddress = conf.get("hive.hyracks.host");
- int port = Integer.parseInt(conf.get("hive.hyracks.port"));
- String applicationName = conf.get("hive.hyracks.app");
- System.out.println("connect to " + ipAddress + " " + port);
+ private void executeHyraxJob(JobSpecification job) throws Exception {
+ String ipAddress = conf.get("hive.hyracks.host");
+ int port = Integer.parseInt(conf.get("hive.hyracks.port"));
+ String applicationName = conf.get("hive.hyracks.app");
+ //System.out.println("connect to " + ipAddress + " " + port);
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- System.out.println("get connected");
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(applicationName, job);
- hcc.waitForCompletion(jobId);
+ //System.out.println("get connected");
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob(applicationName, job);
+ hcc.waitForCompletion(jobId);
- System.out.println("job finished: " + jobId.toString());
- // call all leave nodes to end
- for (Operator leaf : leaveOps) {
- jobClose(leaf);
- }
+ //System.out.println("job finished: " + jobId.toString());
+ // call all leave nodes to end
+ for (Operator leaf : leaveOps) {
+ jobClose(leaf);
+ }
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- }
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
- /**
- * mv to final directory on hdfs (not real final)
- *
- * @param leaf
- * @throws Exception
- */
- private void jobClose(Operator leaf) throws Exception {
- FileSinkOperator fsOp = (FileSinkOperator) leaf;
- FileSinkDesc desc = fsOp.getConf();
- boolean isNativeTable = !desc.getTableInfo().isNonNative();
- if ((conf != null) && isNativeTable) {
- String specPath = desc.getDirName();
- DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
- // for 0.7.0
- fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
- // for 0.8.0
- // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
- // desc);
- }
- }
+ /**
+ * mv to final directory on hdfs (not real final)
+ *
+ * @param leaf
+ * @throws Exception
+ */
+ private void jobClose(Operator leaf) throws Exception {
+ FileSinkOperator fsOp = (FileSinkOperator) leaf;
+ FileSinkDesc desc = fsOp.getConf();
+ boolean isNativeTable = !desc.getTableInfo().isNonNative();
+ if ((conf != null) && isNativeTable) {
+ String specPath = desc.getDirName();
+ DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
+ // for 0.7.0
+ fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
+ // for 0.8.0
+ // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
+ // desc);
+ }
+ }
}
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java b/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..760a614
--- /dev/null
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+
+ public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
+
+ private static final long serialVersionUID = 1L;
+
+ private MurmurHash3BinaryHashFunctionFamily() {
+ }
+
+ private static final int C1 = 0xcc9e2d51;
+ private static final int C2 = 0x1b873593;
+ private static final int C3 = 5;
+ private static final int C4 = 0xe6546b64;
+ private static final int C5 = 0x85ebca6b;
+ private static final int C6 = 0xc2b2ae35;
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = seed;
+ int p = offset;
+ int remain = length;
+ while (remain >= 4) {
+ int k = (bytes[p] & 0xff) | ((bytes[p + 1] & 0xff) << 8) | ((bytes[p + 2] & 0xff) << 16)
+ | ((bytes[p + 3] & 0xff) << 24);
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ h = Integer.rotateLeft(h, 13);
+ h = h * C3 + C4;
+ p += 4;
+ remain -= 4;
+ }
+ if (remain > 0) {
+ int k = 0;
+ for (int i = 0; remain > 0; i += 8) {
+ k ^= (bytes[p++] & 0xff) << i;
+ remain--;
+ }
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ }
+ h ^= length;
+ h ^= (h >>> 16);
+ h *= C5;
+ h ^= (h >>> 13);
+ h *= C6;
+ h ^= (h >>> 16);
+ return h;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java b/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
index 12222b4..7681bd1 100644
--- a/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
@@ -175,9 +175,7 @@
if (outputColumnsOffset[i] >= 0)
outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
- long serDeTime = 0;
while (reader.next(key, value)) {
- long start = System.currentTimeMillis();
// reuse the tuple builder
tb.reset();
if (parser != null) {
@@ -205,8 +203,6 @@
i++;
}
}
- long end = System.currentTimeMillis();
- serDeTime += (end - start);
if (!appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
@@ -221,7 +217,6 @@
}
}
}
- System.out.println("serde time: " + serDeTime);
reader.close();
System.gc();
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java b/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..e7a2e79
--- /dev/null
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class HiveBinaryHashFunctionFamilyProvider implements IBinaryHashFunctionFamilyProvider {
+
+ public static HiveBinaryHashFunctionFamilyProvider INSTANCE = new HiveBinaryHashFunctionFamilyProvider();
+
+ private HiveBinaryHashFunctionFamilyProvider() {
+
+ }
+
+ @Override
+ public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type) throws AlgebricksException {
+ return MurmurHash3BinaryHashFunctionFamily.INSTANCE;
+ }
+}