split hivesterix into serveral modules
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3074 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-translator/pom.xml b/hivesterix/hivesterix-translator/pom.xml
new file mode 100644
index 0000000..b99d652
--- /dev/null
+++ b/hivesterix/hivesterix-translator/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hivesterix-translator</artifactId>
+ <name>hivesterix-translator</name>
+
+ <parent>
+ <artifactId>hivesterix</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <encoding>UTF-8</encoding>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hivesterix-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hivesterix-runtime</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
new file mode 100644
index 0000000..80b3fef
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveAlgebricksTranslator.java
@@ -0,0 +1,807 @@
+package edu.uci.ics.hivesterix.logical.plan;
+
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant;
+import edu.uci.ics.hivesterix.logical.expression.HiveAlgebricksBuiltInFunctionMap;
+import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo;
+import edu.uci.ics.hivesterix.logical.expression.HivesterixConstantValue;
+import edu.uci.ics.hivesterix.logical.plan.visitor.ExtractVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.FilterVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.GroupByVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.JoinVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.LateralViewJoinVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.LimitVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.MapJoinVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.ProjectVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.SortVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.TableScanWriteVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.UnionVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Visitor;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+
+@SuppressWarnings("rawtypes")
+public class HiveAlgebricksTranslator implements Translator {
+
+ private int currentVariable = 0;
+
+ private List<Mutable<ILogicalOperator>> logicalOp = new ArrayList<Mutable<ILogicalOperator>>();
+
+ private boolean continueTraverse = true;
+
+ private IMetadataProvider<PartitionDesc, Object> metaData;
+
+ /**
+ * map variable name to the logical variable
+ */
+ private HashMap<String, LogicalVariable> nameToLogicalVariableMap = new HashMap<String, LogicalVariable>();
+
+ /**
+ * map field name to LogicalVariable
+ */
+ private HashMap<String, LogicalVariable> fieldToLogicalVariableMap = new HashMap<String, LogicalVariable>();
+
+ /**
+ * map logical variable to name
+ */
+ private HashMap<LogicalVariable, String> logicalVariableToFieldMap = new HashMap<LogicalVariable, String>();
+
+ /**
+ * asterix root operators
+ */
+ private List<Mutable<ILogicalOperator>> rootOperators = new ArrayList<Mutable<ILogicalOperator>>();
+
+ /**
+ * a list of visitors
+ */
+ private List<Visitor> visitors = new ArrayList<Visitor>();
+
+ /**
+ * output writer to print things out
+ */
+ private static PrintWriter outputWriter = new PrintWriter(new OutputStreamWriter(System.out));
+
+ /**
+ * map a logical variable to type info
+ */
+ private HashMap<LogicalVariable, TypeInfo> variableToType = new HashMap<LogicalVariable, TypeInfo>();
+
+ @Override
+ public LogicalVariable getVariable(String fieldName, TypeInfo type) {
+ LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
+ if (var == null) {
+ currentVariable++;
+ var = new LogicalVariable(currentVariable);
+ fieldToLogicalVariableMap.put(fieldName, var);
+ nameToLogicalVariableMap.put(var.toString(), var);
+ variableToType.put(var, type);
+ logicalVariableToFieldMap.put(var, fieldName);
+ }
+ return var;
+ }
+
+ @Override
+ public LogicalVariable getNewVariable(String fieldName, TypeInfo type) {
+ currentVariable++;
+ LogicalVariable var = new LogicalVariable(currentVariable);
+ fieldToLogicalVariableMap.put(fieldName, var);
+ nameToLogicalVariableMap.put(var.toString(), var);
+ variableToType.put(var, type);
+ logicalVariableToFieldMap.put(var, fieldName);
+ return var;
+ }
+
+ @Override
+ public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar) {
+ String name = this.logicalVariableToFieldMap.get(oldVar);
+ if (name != null) {
+ fieldToLogicalVariableMap.put(name, newVar);
+ nameToLogicalVariableMap.put(newVar.toString(), newVar);
+ nameToLogicalVariableMap.put(oldVar.toString(), newVar);
+ logicalVariableToFieldMap.put(newVar, name);
+ }
+ }
+
+ @Override
+ public IMetadataProvider<PartitionDesc, Object> getMetadataProvider() {
+ return metaData;
+ }
+
+ /**
+ * only get an variable, without rewriting it
+ *
+ * @param fieldName
+ * @return
+ */
+ private LogicalVariable getVariableOnly(String fieldName) {
+ return fieldToLogicalVariableMap.get(fieldName);
+ }
+
+ private void updateVariable(String fieldName, LogicalVariable variable) {
+ LogicalVariable var = fieldToLogicalVariableMap.get(fieldName);
+ if (var == null) {
+ fieldToLogicalVariableMap.put(fieldName, variable);
+ nameToLogicalVariableMap.put(fieldName, variable);
+ } else if (!var.equals(variable)) {
+ fieldToLogicalVariableMap.put(fieldName, variable);
+ nameToLogicalVariableMap.put(fieldName, variable);
+ }
+ }
+
+ /**
+ * get a list of logical variables from the schema
+ *
+ * @param schema
+ * @return
+ */
+ @Override
+ public List<LogicalVariable> getVariablesFromSchema(Schema schema) {
+ List<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ List<String> names = schema.getNames();
+
+ for (String name : names)
+ variables.add(nameToLogicalVariableMap.get(name));
+ return variables;
+ }
+
+ /**
+ * get variable to typeinfo map
+ *
+ * @return
+ */
+ public HashMap<LogicalVariable, TypeInfo> getVariableContext() {
+ return this.variableToType;
+ }
+
+ /**
+ * get the number of variables s
+ *
+ * @return
+ */
+ public int getVariableCounter() {
+ return currentVariable + 1;
+ }
+
+ /**
+ * translate from hive operator tree to asterix operator tree
+ *
+ * @param hive
+ * roots
+ * @return Algebricks roots
+ */
+ public void translate(List<Operator> hiveRoot, ILogicalOperator parentOperator,
+ HashMap<String, PartitionDesc> aliasToPathMap) throws AlgebricksException {
+ /**
+ * register visitors
+ */
+ visitors.add(new FilterVisitor());
+ visitors.add(new GroupByVisitor());
+ visitors.add(new JoinVisitor());
+ visitors.add(new LateralViewJoinVisitor());
+ visitors.add(new UnionVisitor());
+ visitors.add(new LimitVisitor());
+ visitors.add(new MapJoinVisitor());
+ visitors.add(new ProjectVisitor());
+ visitors.add(new SortVisitor());
+ visitors.add(new ExtractVisitor());
+ visitors.add(new TableScanWriteVisitor(aliasToPathMap));
+
+ List<Mutable<ILogicalOperator>> refList = translate(hiveRoot, new MutableObject<ILogicalOperator>(
+ parentOperator));
+ insertReplicateOperator(refList);
+ if (refList != null)
+ rootOperators.addAll(refList);
+ }
+
+ /**
+ * translate operator DAG
+ *
+ * @param hiveRoot
+ * @param AlgebricksParentOperator
+ * @return
+ */
+ private List<Mutable<ILogicalOperator>> translate(List<Operator> hiveRoot,
+ Mutable<ILogicalOperator> AlgebricksParentOperator) throws AlgebricksException {
+
+ for (Operator hiveOperator : hiveRoot) {
+ continueTraverse = true;
+ Mutable<ILogicalOperator> currentOperatorRef = null;
+ if (hiveOperator.getType() == OperatorType.FILTER) {
+ FilterOperator fop = (FilterOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.REDUCESINK) {
+ ReduceSinkOperator fop = (ReduceSinkOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.JOIN) {
+ JoinOperator fop = (JoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.LATERALVIEWJOIN) {
+ LateralViewJoinOperator fop = (LateralViewJoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.MAPJOIN) {
+ MapJoinOperator fop = (MapJoinOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ if (currentOperatorRef == null)
+ return null;
+ } else if (hiveOperator.getType() == OperatorType.SELECT) {
+ SelectOperator fop = (SelectOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.EXTRACT) {
+ ExtractOperator fop = (ExtractOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.GROUPBY) {
+ GroupByOperator fop = (GroupByOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.TABLESCAN) {
+ TableScanOperator fop = (TableScanOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.FILESINK) {
+ FileSinkOperator fop = (FileSinkOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(fop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.LIMIT) {
+ LimitOperator lop = (LimitOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.UDTF) {
+ UDTFOperator lop = (UDTFOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null)
+ break;
+ }
+ } else if (hiveOperator.getType() == OperatorType.UNION) {
+ UnionOperator lop = (UnionOperator) hiveOperator;
+ for (Visitor visitor : visitors) {
+ currentOperatorRef = visitor.visit(lop, AlgebricksParentOperator, this);
+ if (currentOperatorRef != null) {
+ continueTraverse = true;
+ break;
+ } else
+ continueTraverse = false;
+ }
+ } else
+ ;
+ if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0
+ && continueTraverse) {
+ @SuppressWarnings("unchecked")
+ List<Operator> children = hiveOperator.getChildOperators();
+ if (currentOperatorRef == null)
+ currentOperatorRef = AlgebricksParentOperator;
+ translate(children, currentOperatorRef);
+ }
+ if (hiveOperator.getChildOperators() == null || hiveOperator.getChildOperators().size() == 0)
+ logicalOp.add(currentOperatorRef);
+ }
+ return logicalOp;
+ }
+
+ /**
+ * used in select, group by to get no-column-expression columns
+ *
+ * @param cols
+ * @return
+ */
+ public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,
+ ArrayList<LogicalVariable> variables) {
+
+ ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+
+ /**
+ * variables to be appended in the assign operator
+ */
+ ArrayList<LogicalVariable> appendedVariables = new ArrayList<LogicalVariable>();
+
+ // one variable can only be assigned once
+ for (ExprNodeDesc hiveExpr : cols) {
+ rewriteExpression(hiveExpr);
+
+ if (hiveExpr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc2 = (ExprNodeColumnDesc) hiveExpr;
+ String fieldName = desc2.getTabAlias() + "." + desc2.getColumn();
+
+ // System.out.println("project expr: " + fieldName);
+
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariable(fieldName, hiveExpr.getTypeInfo());
+ desc2.setColumn(var.toString());
+ desc2.setTabAlias("");
+ variables.add(var);
+ } else {
+ LogicalVariable var = nameToLogicalVariableMap.get(desc2.getColumn());
+ String name = this.logicalVariableToFieldMap.get(var);
+ var = this.getVariableOnly(name);
+ variables.add(var);
+ }
+ } else {
+ Mutable<ILogicalExpression> asterixExpr = translateScalarFucntion(hiveExpr);
+ expressions.add(asterixExpr);
+ LogicalVariable var = getVariable(hiveExpr.getExprString() + asterixExpr.hashCode(),
+ hiveExpr.getTypeInfo());
+ variables.add(var);
+ appendedVariables.add(var);
+ }
+ }
+
+ /**
+ * create an assign operator to deal with appending
+ */
+ ILogicalOperator assignOp = null;
+ if (appendedVariables.size() > 0) {
+ assignOp = new AssignOperator(appendedVariables, expressions);
+ assignOp.getInputs().add(parent);
+ }
+ return assignOp;
+ }
+
+ private ILogicalPlan plan;
+
+ public ILogicalPlan genLogicalPlan() {
+ plan = new ALogicalPlanImpl(rootOperators);
+ return plan;
+ }
+
+ public void printOperators() throws AlgebricksException {
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+ outputWriter.println(buffer);
+ outputWriter.println("rewritten variables: ");
+ outputWriter.flush();
+ printVariables();
+
+ }
+
+ public static void setOutputPrinter(PrintWriter writer) {
+ outputWriter = writer;
+ }
+
+ private void printVariables() {
+ Set<Entry<String, LogicalVariable>> entries = fieldToLogicalVariableMap.entrySet();
+
+ for (Entry<String, LogicalVariable> entry : entries) {
+ outputWriter.println(entry.getKey() + " -> " + entry.getValue());
+ }
+ outputWriter.flush();
+ }
+
+ /**
+ * generate the object inspector for the output of an operator
+ *
+ * @param operator
+ * The Hive operator
+ * @return an ObjectInspector object
+ */
+ public Schema generateInputSchema(Operator operator) {
+ List<String> variableNames = new ArrayList<String>();
+ List<TypeInfo> typeList = new ArrayList<TypeInfo>();
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+
+ for (ColumnInfo col : columns) {
+ // typeList.add();
+ TypeInfo type = col.getType();
+ typeList.add(type);
+
+ String fieldName = col.getInternalName();
+ variableNames.add(fieldName);
+ }
+
+ return new Schema(variableNames, typeList);
+ }
+
+ /**
+ * rewrite the names of output columns for feature expression evaluators to
+ * use
+ *
+ * @param operator
+ */
+ public void rewriteOperatorOutputSchema(Operator operator) {
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+
+ for (ColumnInfo column : columns) {
+ String columnName = column.getTabAlias() + "." + column.getInternalName();
+ if (columnName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariable(columnName, column.getType());
+ column.setInternalName(var.toString());
+ }
+ }
+ }
+
+ @Override
+ public void rewriteOperatorOutputSchema(List<LogicalVariable> variables, Operator operator) {
+
+ //printOperatorSchema(operator);
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+ if (variables.size() != columns.size()) {
+ throw new IllegalStateException("output cardinality error " + operator.getName() + " variable size: "
+ + variables.size() + " expected " + columns.size());
+ }
+
+ for (int i = 0; i < variables.size(); i++) {
+ LogicalVariable var = variables.get(i);
+ ColumnInfo column = columns.get(i);
+ String fieldName = column.getTabAlias() + "." + column.getInternalName();
+ if (fieldName.indexOf("$$") < 0) {
+ updateVariable(fieldName, var);
+ column.setInternalName(var.toString());
+ }
+ }
+ //printOperatorSchema(operator);
+ }
+
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpression(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
+ String fieldName = desc.getTabAlias() + "." + desc.getColumn();
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariableOnly(fieldName);
+ if (var == null) {
+ fieldName = "." + desc.getColumn();
+ var = getVariableOnly(fieldName);
+ if (var == null) {
+ fieldName = "null." + desc.getColumn();
+ var = getVariableOnly(fieldName);
+ if (var == null) {
+ throw new IllegalStateException(fieldName + " is wrong!!! ");
+ }
+ }
+ }
+ String name = this.logicalVariableToFieldMap.get(var);
+ var = getVariableOnly(name);
+ desc.setColumn(var.toString());
+ }
+ } else {
+ if (expr.getChildren() != null && expr.getChildren().size() > 0) {
+ List<ExprNodeDesc> children = expr.getChildren();
+ for (ExprNodeDesc desc : children)
+ rewriteExpression(desc);
+ }
+ }
+ }
+
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpressionPartial(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc desc = (ExprNodeColumnDesc) expr;
+ String fieldName = desc.getTabAlias() + "." + desc.getColumn();
+ if (fieldName.indexOf("$$") < 0) {
+ LogicalVariable var = getVariableOnly(fieldName);
+ desc.setColumn(var.toString());
+ }
+ } else {
+ if (expr.getChildren() != null && expr.getChildren().size() > 0) {
+ List<ExprNodeDesc> children = expr.getChildren();
+ for (ExprNodeDesc desc : children)
+ rewriteExpressionPartial(desc);
+ }
+ }
+ }
+
+ // private void printOperatorSchema(Operator operator) {
+ // // System.out.println(operator.getName());
+ // // List<ColumnInfo> columns = operator.getSchema().getSignature();
+ // // for (ColumnInfo column : columns) {
+ // // System.out.print(column.getTabAlias() + "." +
+ // // column.getInternalName() + " ");
+ // // }
+ // // System.out.println();
+ // }
+
+ /**
+ * translate scalar function expression
+ *
+ * @param hiveExpr
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc hiveExpr) {
+ ILogicalExpression AlgebricksExpr;
+
+ if (hiveExpr instanceof ExprNodeGenericFuncDesc) {
+ List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
+ List<ExprNodeDesc> children = hiveExpr.getChildren();
+
+ for (ExprNodeDesc child : children)
+ arguments.add(translateScalarFucntion(child));
+
+ ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc) hiveExpr;
+ GenericUDF genericUdf = funcExpr.getGenericUDF();
+ UDF udf = null;
+ if (genericUdf instanceof GenericUDFBridge) {
+ GenericUDFBridge bridge = (GenericUDFBridge) genericUdf;
+ try {
+ udf = bridge.getUdfClass().newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * set up the hive function
+ */
+ Object hiveFunction = genericUdf;
+ if (udf != null)
+ hiveFunction = udf;
+
+ FunctionIdentifier funcId = HiveAlgebricksBuiltInFunctionMap.INSTANCE.getAlgebricksFunctionId(hiveFunction
+ .getClass());
+ if (funcId == null) {
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, hiveFunction.getClass().getName());
+ }
+
+ Object functionInfo = null;
+ if (genericUdf instanceof GenericUDFBridge) {
+ functionInfo = funcExpr;
+ }
+
+ /**
+ * generate the function call expression
+ */
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, functionInfo), arguments);
+ AlgebricksExpr = AlgebricksFuncExpr;
+
+ } else if (hiveExpr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc column = (ExprNodeColumnDesc) hiveExpr;
+ LogicalVariable var = this.getVariable(column.getColumn());
+ AlgebricksExpr = new VariableReferenceExpression(var);
+
+ } else if (hiveExpr instanceof ExprNodeFieldDesc) {
+ FunctionIdentifier funcId;
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.FIELDACCESS);
+
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, hiveExpr));
+ AlgebricksExpr = AlgebricksFuncExpr;
+ } else if (hiveExpr instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc hiveConst = (ExprNodeConstantDesc) hiveExpr;
+ Object value = hiveConst.getValue();
+ AlgebricksExpr = new ConstantExpression(new HivesterixConstantValue(value));
+ } else if (hiveExpr instanceof ExprNodeNullDesc) {
+ FunctionIdentifier funcId;
+ funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, ExpressionConstant.NULL);
+
+ ScalarFunctionCallExpression AlgebricksFuncExpr = new ScalarFunctionCallExpression(new HiveFunctionInfo(
+ funcId, hiveExpr));
+
+ AlgebricksExpr = AlgebricksFuncExpr;
+ } else {
+ throw new IllegalStateException("unknown hive expression");
+ }
+ return new MutableObject<ILogicalExpression>(AlgebricksExpr);
+ }
+
+ /**
+ * translate aggregation function expression
+ *
+ * @param aggregateDesc
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc) {
+
+ String UDAFName = aggregateDesc.getGenericUDAFName();
+
+ List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
+ List<ExprNodeDesc> children = aggregateDesc.getParameters();
+
+ for (ExprNodeDesc child : children)
+ arguments.add(translateScalarFucntion(child));
+
+ FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDAFName + "("
+ + aggregateDesc.getMode() + ")");
+ HiveFunctionInfo funcInfo = new HiveFunctionInfo(funcId, aggregateDesc);
+ AggregateFunctionCallExpression aggregationExpression = new AggregateFunctionCallExpression(funcInfo, false,
+ arguments);
+ return new MutableObject<ILogicalExpression>(aggregationExpression);
+ }
+
+ /**
+ * translate aggregation function expression
+ *
+ * @param aggregator
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument) {
+
+ String UDTFName = udtfDesc.getUDTFName();
+
+ FunctionIdentifier funcId = new FunctionIdentifier(ExpressionConstant.NAMESPACE, UDTFName);
+ UnnestingFunctionCallExpression unnestingExpression = new UnnestingFunctionCallExpression(new HiveFunctionInfo(
+ funcId, udtfDesc));
+ unnestingExpression.getArguments().add(argument);
+ return new MutableObject<ILogicalExpression>(unnestingExpression);
+ }
+
+ /**
+ * get typeinfo
+ */
+ @Override
+ public TypeInfo getType(LogicalVariable var) {
+ return variableToType.get(var);
+ }
+
+ /**
+ * get variable from variable name
+ */
+ @Override
+ public LogicalVariable getVariable(String name) {
+ return nameToLogicalVariableMap.get(name);
+ }
+
+ @Override
+ public LogicalVariable getVariableFromFieldName(String fieldName) {
+ return this.getVariableOnly(fieldName);
+ }
+
+ /**
+ * set the metadata provider
+ */
+ @Override
+ public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata) {
+ this.metaData = metadata;
+ }
+
+ /**
+ * insert ReplicateOperator when necessary
+ */
+ private void insertReplicateOperator(List<Mutable<ILogicalOperator>> roots) {
+ Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childToParentsMap = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+ buildChildToParentsMapping(roots, childToParentsMap);
+ for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : childToParentsMap.entrySet()) {
+ List<Mutable<ILogicalOperator>> pList = entry.getValue();
+ if (pList.size() > 1) {
+ ILogicalOperator rop = new ReplicateOperator(pList.size());
+ Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
+ Mutable<ILogicalOperator> childRef = entry.getKey();
+ rop.getInputs().add(childRef);
+ for (Mutable<ILogicalOperator> parentRef : pList) {
+ ILogicalOperator parentOp = parentRef.getValue();
+ int index = parentOp.getInputs().indexOf(childRef);
+ parentOp.getInputs().set(index, ropRef);
+ }
+ }
+ }
+ }
+
+ /**
+ * build the mapping from child to Parents
+ *
+ * @param roots
+ * @param childToParentsMap
+ */
+ private void buildChildToParentsMapping(List<Mutable<ILogicalOperator>> roots,
+ Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> map) {
+ for (Mutable<ILogicalOperator> opRef : roots) {
+ List<Mutable<ILogicalOperator>> childRefs = opRef.getValue().getInputs();
+ for (Mutable<ILogicalOperator> childRef : childRefs) {
+ List<Mutable<ILogicalOperator>> parentList = map.get(childRef);
+ if (parentList == null) {
+ parentList = new ArrayList<Mutable<ILogicalOperator>>();
+ map.put(childRef, parentList);
+ }
+ if (!parentList.contains(opRef))
+ parentList.add(opRef);
+ }
+ buildChildToParentsMapping(childRefs, map);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveLogicalPlanAndMetaData.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveLogicalPlanAndMetaData.java
new file mode 100644
index 0000000..d5801a3
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveLogicalPlanAndMetaData.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.logical.plan;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HiveLogicalPlanAndMetaData implements ILogicalPlanAndMetadata {
+
+ IMetadataProvider metadata;
+ ILogicalPlan plan;
+
+ public HiveLogicalPlanAndMetaData(ILogicalPlan plan, IMetadataProvider metadata) {
+ this.plan = plan;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public IMetadataProvider getMetadataProvider() {
+ return metadata;
+ }
+
+ @Override
+ public ILogicalPlan getPlan() {
+ return plan;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveOperatorAnnotations.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveOperatorAnnotations.java
new file mode 100644
index 0000000..0ea4e01
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/HiveOperatorAnnotations.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hivesterix.logical.plan;
+
+public class HiveOperatorAnnotations {
+
+ // hints
+ public static final String LOCAL_GROUP_BY = "LOCAL_GROUP_BY";
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ExtractVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ExtractVisitor.java
new file mode 100644
index 0000000..1c67bae
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ExtractVisitor.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class ExtractVisitor extends DefaultVisitor {
+
+ @Override
+ public Mutable<ILogicalOperator> visit(ExtractOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ Schema currentSchema = t.generateInputSchema(operator.getParentOperators().get(0));
+ operator.setSchema(operator.getParentOperators().get(0).getSchema());
+ List<LogicalVariable> latestOutputSchema = t.getVariablesFromSchema(currentSchema);
+ t.rewriteOperatorOutputSchema(latestOutputSchema, operator);
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/FilterVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/FilterVisitor.java
new file mode 100644
index 0000000..9279144
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/FilterVisitor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+
+public class FilterVisitor extends DefaultVisitor {
+
+ @Override
+ public Mutable<ILogicalOperator> visit(FilterOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ Schema currentSchema = t.generateInputSchema(operator.getParentOperators().get(0));
+
+ FilterDesc desc = (FilterDesc) operator.getConf();
+ ExprNodeDesc predicate = desc.getPredicate();
+ t.rewriteExpression(predicate);
+
+ Mutable<ILogicalExpression> exprs = t.translateScalarFucntion(desc.getPredicate());
+ ILogicalOperator currentOperator = new SelectOperator(exprs);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+
+ // populate the schema from upstream operator
+ operator.setSchema(operator.getParentOperators().get(0).getSchema());
+ List<LogicalVariable> latestOutputSchema = t.getVariablesFromSchema(currentSchema);
+ t.rewriteOperatorOutputSchema(latestOutputSchema, operator);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/GroupByVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/GroupByVisitor.java
new file mode 100644
index 0000000..b7d5779
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/GroupByVisitor.java
@@ -0,0 +1,264 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hivesterix.logical.plan.HiveOperatorAnnotations;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GroupByVisitor extends DefaultVisitor {
+
+ private List<Mutable<ILogicalExpression>> AlgebricksAggs = new ArrayList<Mutable<ILogicalExpression>>();
+ private List<IFunctionInfo> localAggs = new ArrayList<IFunctionInfo>();
+ private boolean isDistinct = false;
+ private boolean gbyKeyNotRedKey = false;
+
+ @Override
+ public Mutable<ILogicalOperator> visit(GroupByOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
+
+ // get descriptors
+ GroupByDesc desc = (GroupByDesc) operator.getConf();
+ GroupByDesc.Mode mode = desc.getMode();
+
+ List<ExprNodeDesc> keys = desc.getKeys();
+ List<AggregationDesc> aggregators = desc.getAggregators();
+
+ Operator child = operator.getChildOperators().get(0);
+
+ if (child.getType() == OperatorType.REDUCESINK) {
+ List<ExprNodeDesc> partKeys = ((ReduceSinkDesc) child.getConf()).getPartitionCols();
+ if (keys.size() != partKeys.size())
+ gbyKeyNotRedKey = true;
+ }
+
+ if (mode == GroupByDesc.Mode.PARTIAL1 || mode == GroupByDesc.Mode.HASH || mode == GroupByDesc.Mode.COMPLETE
+ || (aggregators.size() == 0 && isDistinct == false) || gbyKeyNotRedKey) {
+ AlgebricksAggs.clear();
+ // add an assign operator if the key is not a column expression
+ ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
+ ILogicalOperator currentOperator = null;
+ ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ // get key variable expression list
+ List<Mutable<ILogicalExpression>> keyExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ for (LogicalVariable var : keyVariables) {
+ keyExprs.add(t.translateScalarFucntion(new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, var
+ .toString(), "", false)));
+ }
+
+ if (aggregators.size() == 0) {
+ List<Mutable<ILogicalExpression>> distinctExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ for (LogicalVariable var : keyVariables) {
+ Mutable<ILogicalExpression> varExpr = new MutableObject<ILogicalExpression>(
+ new VariableReferenceExpression(var));
+ distinctExprs.add(varExpr);
+ }
+ t.rewriteOperatorOutputSchema(keyVariables, operator);
+ isDistinct = true;
+ ILogicalOperator lop = new DistinctOperator(distinctExprs);
+ lop.getInputs().add(AlgebricksParentOperatorRef);
+ return new MutableObject<ILogicalOperator>(lop);
+ }
+
+ // get the pair<LogicalVariable, ILogicalExpression> list
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> keyParameters = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ keyVariables.clear();
+ for (Mutable<ILogicalExpression> expr : keyExprs) {
+ LogicalVariable keyVar = t.getVariable(expr.getValue().toString(), TypeInfoFactory.unknownTypeInfo);
+ keyParameters.add(new Pair(keyVar, expr));
+ keyVariables.add(keyVar);
+ }
+
+ // get the parameters for the aggregator operator
+ ArrayList<LogicalVariable> aggVariables = new ArrayList<LogicalVariable>();
+ ArrayList<Mutable<ILogicalExpression>> aggExprs = new ArrayList<Mutable<ILogicalExpression>>();
+
+ // get the type of each aggregation function
+ HashMap<AggregationDesc, TypeInfo> aggToType = new HashMap<AggregationDesc, TypeInfo>();
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+ int offset = keys.size();
+ for (int i = offset; i < columns.size(); i++) {
+ aggToType.put(aggregators.get(i - offset), columns.get(i).getType());
+ }
+
+ localAggs.clear();
+ // rewrite parameter expressions for all aggregators
+ for (AggregationDesc aggregator : aggregators) {
+ for (ExprNodeDesc parameter : aggregator.getParameters()) {
+ t.rewriteExpression(parameter);
+ }
+ Mutable<ILogicalExpression> aggExpr = t.translateAggregation(aggregator);
+ AbstractFunctionCallExpression localAggExpr = (AbstractFunctionCallExpression) aggExpr.getValue();
+ localAggs.add(localAggExpr.getFunctionInfo());
+
+ AggregationDesc logicalAgg = new AggregationDesc(aggregator.getGenericUDAFName(),
+ aggregator.getGenericUDAFEvaluator(), aggregator.getParameters(), aggregator.getDistinct(),
+ Mode.COMPLETE);
+ Mutable<ILogicalExpression> logicalAggExpr = t.translateAggregation(logicalAgg);
+
+ AlgebricksAggs.add(logicalAggExpr);
+ if (!gbyKeyNotRedKey)
+ aggExprs.add(logicalAggExpr);
+ else
+ aggExprs.add(aggExpr);
+
+ aggVariables.add(t.getVariable(aggregator.getExprString() + aggregator.getMode(),
+ aggToType.get(aggregator)));
+ }
+
+ if (child.getType() != OperatorType.REDUCESINK)
+ gbyKeyNotRedKey = false;
+
+ // get the sub plan list
+ AggregateOperator aggOperator = new AggregateOperator(aggVariables, aggExprs);
+ NestedTupleSourceOperator nestedTupleSource = new NestedTupleSourceOperator(
+ new MutableObject<ILogicalOperator>());
+ aggOperator.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSource));
+
+ List<Mutable<ILogicalOperator>> subRoots = new ArrayList<Mutable<ILogicalOperator>>();
+ subRoots.add(new MutableObject<ILogicalOperator>(aggOperator));
+ ILogicalPlan subPlan = new ALogicalPlanImpl(subRoots);
+ List<ILogicalPlan> subPlans = new ArrayList<ILogicalPlan>();
+ subPlans.add(subPlan);
+
+ // create the group by operator
+ currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator(
+ keyParameters, new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(), subPlans);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+ nestedTupleSource.getDataSourceReference().setValue(currentOperator);
+
+ List<LogicalVariable> outputVariables = new ArrayList<LogicalVariable>();
+ outputVariables.addAll(keyVariables);
+ outputVariables.addAll(aggVariables);
+ t.rewriteOperatorOutputSchema(outputVariables, operator);
+
+ if (gbyKeyNotRedKey) {
+ currentOperator.getAnnotations().put(HiveOperatorAnnotations.LOCAL_GROUP_BY, Boolean.TRUE);
+ }
+
+ HiveConf conf = ConfUtil.getHiveConf();
+ Boolean extGby = conf.getBoolean("hive.algebricks.groupby.external", false);
+
+ if (extGby && isSerializable(aggregators)) {
+ currentOperator.getAnnotations().put(OperatorAnnotations.USE_EXTERNAL_GROUP_BY, Boolean.TRUE);
+ }
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ } else {
+ isDistinct = false;
+ // rewrite parameter expressions for all aggregators
+ int i = 0;
+ for (AggregationDesc aggregator : aggregators) {
+ for (ExprNodeDesc parameter : aggregator.getParameters()) {
+ t.rewriteExpression(parameter);
+ }
+ Mutable<ILogicalExpression> agg = t.translateAggregation(aggregator);
+ AggregateFunctionCallExpression originalAgg = (AggregateFunctionCallExpression) AlgebricksAggs.get(i)
+ .getValue();
+ originalAgg.setStepOneAggregate(localAggs.get(i));
+ AggregateFunctionCallExpression currentAgg = (AggregateFunctionCallExpression) agg.getValue();
+ if (currentAgg.getFunctionInfo() != null) {
+ originalAgg.setTwoStep(true);
+ originalAgg.setStepTwoAggregate(currentAgg.getFunctionInfo());
+ }
+ i++;
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ Operator downStream = (Operator) operator.getChildOperators().get(0);
+ if (!(downStream instanceof GroupByOperator)) {
+ return null;
+ }
+
+ ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
+ List<ExprNodeDesc> keys = desc.getKeyCols();
+ List<ExprNodeDesc> values = desc.getValueCols();
+
+ // insert assign for keys
+ ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
+ t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
+
+ // insert assign for values
+ ArrayList<LogicalVariable> valueVariables = new ArrayList<LogicalVariable>();
+ t.getAssignOperator(AlgebricksParentOperatorRef, values, valueVariables);
+
+ ArrayList<LogicalVariable> columns = new ArrayList<LogicalVariable>();
+ columns.addAll(keyVariables);
+ columns.addAll(valueVariables);
+
+ t.rewriteOperatorOutputSchema(columns, operator);
+ return null;
+ }
+
+ private boolean isSerializable(List<AggregationDesc> descs) throws AlgebricksException {
+ try {
+ for (AggregationDesc desc : descs) {
+ GenericUDAFEvaluator udaf = desc.getGenericUDAFEvaluator();
+ AggregationBuffer buf = udaf.getNewAggregationBuffer();
+ Class<?> bufferClass = buf.getClass();
+ Field[] fields = bufferClass.getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ String type = field.getType().toString();
+ if (!(type.equals("int") || type.equals("long") || type.equals("float") || type.equals("double") || type
+ .equals("boolean"))) {
+ return false;
+ }
+ }
+
+ }
+ return true;
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/JoinVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/JoinVisitor.java
new file mode 100644
index 0000000..ef346bc
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/JoinVisitor.java
@@ -0,0 +1,417 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+
+@SuppressWarnings("rawtypes")
+public class JoinVisitor extends DefaultVisitor {
+
+ /**
+ * reduce sink operator to variables
+ */
+ private HashMap<Operator, List<LogicalVariable>> reduceSinkToKeyVariables = new HashMap<Operator, List<LogicalVariable>>();
+
+ /**
+ * reduce sink operator to variables
+ */
+ private HashMap<Operator, List<String>> reduceSinkToFieldNames = new HashMap<Operator, List<String>>();
+
+ /**
+ * reduce sink operator to variables
+ */
+ private HashMap<Operator, List<TypeInfo>> reduceSinkToTypes = new HashMap<Operator, List<TypeInfo>>();
+
+ /**
+ * map a join operator (in hive) to its parent operators (in hive)
+ */
+ private HashMap<Operator, List<Operator>> operatorToHiveParents = new HashMap<Operator, List<Operator>>();
+
+ /**
+ * map a join operator (in hive) to its parent operators (in asterix)
+ */
+ private HashMap<Operator, List<ILogicalOperator>> operatorToAsterixParents = new HashMap<Operator, List<ILogicalOperator>>();
+
+ /**
+ * the latest traversed reduce sink operator
+ */
+ private Operator latestReduceSink = null;
+
+ /**
+ * the latest generated parent for join
+ */
+ private ILogicalOperator latestAlgebricksOperator = null;
+
+ /**
+ * process a join operator
+ */
+ @Override
+ public Mutable<ILogicalOperator> visit(JoinOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
+ Translator t) {
+ latestAlgebricksOperator = AlgebricksParentOperator.getValue();
+ translateJoinOperatorPreprocess(operator, t);
+ List<Operator> parents = operatorToHiveParents.get(operator);
+ if (parents.size() < operator.getParentOperators().size()) {
+ return null;
+ } else {
+ ILogicalOperator joinOp = translateJoinOperator(operator, AlgebricksParentOperator, t);
+ // clearStatus();
+ return new MutableObject<ILogicalOperator>(joinOp);
+ }
+ }
+
+ private void reorder(Byte[] order, List<ILogicalOperator> parents, List<Operator> hiveParents) {
+ ILogicalOperator[] lops = new ILogicalOperator[parents.size()];
+ Operator[] ops = new Operator[hiveParents.size()];
+
+ for (Operator op : hiveParents) {
+ ReduceSinkOperator rop = (ReduceSinkOperator) op;
+ ReduceSinkDesc rdesc = rop.getConf();
+ int tag = rdesc.getTag();
+
+ int index = -1;
+ for (int i = 0; i < order.length; i++)
+ if (order[i] == tag) {
+ index = i;
+ break;
+ }
+ lops[index] = parents.get(hiveParents.indexOf(op));
+ ops[index] = op;
+ }
+
+ parents.clear();
+ hiveParents.clear();
+
+ for (int i = 0; i < lops.length; i++) {
+ parents.add(lops[i]);
+ hiveParents.add(ops[i]);
+ }
+ }
+
+ /**
+ * translate a hive join operator to asterix join operator->assign
+ * operator->project operator
+ *
+ * @param parentOperator
+ * @param operator
+ * @return
+ */
+ private ILogicalOperator translateJoinOperator(Operator operator, Mutable<ILogicalOperator> parentOperator,
+ Translator t) {
+
+ JoinDesc joinDesc = (JoinDesc) operator.getConf();
+
+ // get the projection expression (already re-written) from each source
+ // table
+ Map<Byte, List<ExprNodeDesc>> exprMap = joinDesc.getExprs();
+ reorder(joinDesc.getTagOrder(), operatorToAsterixParents.get(operator), operatorToHiveParents.get(operator));
+
+ // make an reduce join operator
+ ILogicalOperator currentOperator = generateJoinTree(joinDesc.getCondsList(),
+ operatorToAsterixParents.get(operator), operatorToHiveParents.get(operator), 0, t);
+ parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+
+ // add assign and project operator on top of a join
+ // output variables
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ Set<Entry<Byte, List<ExprNodeDesc>>> entries = exprMap.entrySet();
+ Iterator<Entry<Byte, List<ExprNodeDesc>>> iterator = entries.iterator();
+ while (iterator.hasNext()) {
+ List<ExprNodeDesc> outputExprs = iterator.next().getValue();
+ ILogicalOperator assignOperator = t.getAssignOperator(parentOperator, outputExprs, variables);
+
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+ }
+
+ ILogicalOperator po = new ProjectOperator(variables);
+ po.getInputs().add(parentOperator);
+ t.rewriteOperatorOutputSchema(variables, operator);
+ return po;
+ }
+
+ /**
+ * deal with reduce sink operator for the case of join
+ */
+ @Override
+ public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator, Mutable<ILogicalOperator> parentOperator,
+ Translator t) {
+
+ Operator downStream = (Operator) operator.getChildOperators().get(0);
+ if (!(downStream instanceof JoinOperator))
+ return null;
+
+ ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
+ List<ExprNodeDesc> keys = desc.getKeyCols();
+ List<ExprNodeDesc> values = desc.getValueCols();
+ List<ExprNodeDesc> partitionCols = desc.getPartitionCols();
+
+ /**
+ * rewrite key, value, paritioncol expressions
+ */
+ for (ExprNodeDesc key : keys)
+ t.rewriteExpression(key);
+ for (ExprNodeDesc value : values)
+ t.rewriteExpression(value);
+ for (ExprNodeDesc col : partitionCols)
+ t.rewriteExpression(col);
+
+ ILogicalOperator currentOperator = null;
+
+ // add assign operator for keys if necessary
+ ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
+ ILogicalOperator assignOperator = t.getAssignOperator(parentOperator, keys, keyVariables);
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ // add assign operator for values if necessary
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ assignOperator = t.getAssignOperator(parentOperator, values, variables);
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ // unified schema: key, value
+ ArrayList<LogicalVariable> unifiedKeyValues = new ArrayList<LogicalVariable>();
+ unifiedKeyValues.addAll(keyVariables);
+ for (LogicalVariable value : variables)
+ if (keyVariables.indexOf(value) < 0)
+ unifiedKeyValues.add(value);
+
+ // insert projection operator, it is a *must*,
+ // in hive, reduce sink sometimes also do the projection operator's
+ // task
+ currentOperator = new ProjectOperator(unifiedKeyValues);
+ currentOperator.getInputs().add(parentOperator);
+ parentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+
+ reduceSinkToKeyVariables.put(operator, keyVariables);
+ List<String> fieldNames = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ for (LogicalVariable var : unifiedKeyValues) {
+ fieldNames.add(var.toString());
+ types.add(t.getType(var));
+ }
+ reduceSinkToFieldNames.put(operator, fieldNames);
+ reduceSinkToTypes.put(operator, types);
+ t.rewriteOperatorOutputSchema(variables, operator);
+
+ latestAlgebricksOperator = currentOperator;
+ latestReduceSink = operator;
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ /**
+ * partial rewrite a join operator
+ *
+ * @param operator
+ * @param t
+ */
+ private void translateJoinOperatorPreprocess(Operator operator, Translator t) {
+ JoinDesc desc = (JoinDesc) operator.getConf();
+ ReduceSinkDesc reduceSinkDesc = (ReduceSinkDesc) latestReduceSink.getConf();
+ int tag = reduceSinkDesc.getTag();
+
+ Map<Byte, List<ExprNodeDesc>> exprMap = desc.getExprs();
+ List<ExprNodeDesc> exprs = exprMap.get(Byte.valueOf((byte) tag));
+
+ for (ExprNodeDesc expr : exprs)
+ t.rewriteExpression(expr);
+
+ List<Operator> parents = operatorToHiveParents.get(operator);
+ if (parents == null) {
+ parents = new ArrayList<Operator>();
+ operatorToHiveParents.put(operator, parents);
+ }
+ parents.add(latestReduceSink);
+
+ List<ILogicalOperator> asterixParents = operatorToAsterixParents.get(operator);
+ if (asterixParents == null) {
+ asterixParents = new ArrayList<ILogicalOperator>();
+ operatorToAsterixParents.put(operator, asterixParents);
+ }
+ asterixParents.add(latestAlgebricksOperator);
+ }
+
+ // generate a join tree from a list of exchange/reducesink operator
+ // both exchanges and reduce sinks have the same order
+ private ILogicalOperator generateJoinTree(List<JoinCondDesc> conds, List<ILogicalOperator> exchanges,
+ List<Operator> reduceSinks, int offset, Translator t) {
+ // get a list of reduce sink descs (input descs)
+ int inputSize = reduceSinks.size() - offset;
+
+ if (inputSize == 2) {
+ ILogicalOperator currentRoot;
+
+ List<ReduceSinkDesc> reduceSinkDescs = new ArrayList<ReduceSinkDesc>();
+ for (int i = reduceSinks.size() - 1; i >= offset; i--)
+ reduceSinkDescs.add((ReduceSinkDesc) reduceSinks.get(i).getConf());
+
+ // get the object inspector for the join
+ List<String> fieldNames = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ for (int i = reduceSinks.size() - 1; i >= offset; i--) {
+ fieldNames.addAll(reduceSinkToFieldNames.get(reduceSinks.get(i)));
+ types.addAll(reduceSinkToTypes.get(reduceSinks.get(i)));
+ }
+
+ // get number of equality conjunctions in the final join condition
+ int size = reduceSinkDescs.get(0).getKeyCols().size();
+
+ // make up the join conditon expression
+ List<ExprNodeDesc> joinConditionChildren = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < size; i++) {
+ // create a join key pair
+ List<ExprNodeDesc> keyPair = new ArrayList<ExprNodeDesc>();
+ for (ReduceSinkDesc sink : reduceSinkDescs) {
+ keyPair.add(sink.getKeyCols().get(i));
+ }
+ // create a hive equal condition
+ ExprNodeDesc equality = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
+ new GenericUDFOPEqual(), keyPair);
+ // add the equal condition to the conjunction list
+ joinConditionChildren.add(equality);
+ }
+ // get final conjunction expression
+ ExprNodeDesc conjunct = null;
+
+ if (joinConditionChildren.size() > 1)
+ conjunct = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(),
+ joinConditionChildren);
+ else if (joinConditionChildren.size() == 1)
+ conjunct = joinConditionChildren.get(0);
+ else {
+ // there is no join equality condition, equal-join
+ conjunct = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, new Boolean(true));
+ }
+ // get an ILogicalExpression from hive's expression
+ Mutable<ILogicalExpression> expression = t.translateScalarFucntion(conjunct);
+
+ Mutable<ILogicalOperator> leftBranch = new MutableObject<ILogicalOperator>(
+ exchanges.get(exchanges.size() - 1));
+ Mutable<ILogicalOperator> rightBranch = new MutableObject<ILogicalOperator>(
+ exchanges.get(exchanges.size() - 2));
+ // get the join operator
+ if (conds.get(offset).getType() == JoinDesc.LEFT_OUTER_JOIN) {
+ currentRoot = new LeftOuterJoinOperator(expression);
+ Mutable<ILogicalOperator> temp = leftBranch;
+ leftBranch = rightBranch;
+ rightBranch = temp;
+ } else if (conds.get(offset).getType() == JoinDesc.RIGHT_OUTER_JOIN) {
+ currentRoot = new LeftOuterJoinOperator(expression);
+ } else
+ currentRoot = new InnerJoinOperator(expression);
+
+ currentRoot.getInputs().add(leftBranch);
+ currentRoot.getInputs().add(rightBranch);
+
+ // rewriteOperatorOutputSchema(variables, operator);
+ return currentRoot;
+ } else {
+ // get the child join operator and insert and one-to-one exchange
+ ILogicalOperator joinSrcOne = generateJoinTree(conds, exchanges, reduceSinks, offset + 1, t);
+ // joinSrcOne.addInput(childJoin);
+
+ ILogicalOperator currentRoot;
+
+ List<ReduceSinkDesc> reduceSinkDescs = new ArrayList<ReduceSinkDesc>();
+ for (int i = offset; i < offset + 2; i++)
+ reduceSinkDescs.add((ReduceSinkDesc) reduceSinks.get(i).getConf());
+
+ // get the object inspector for the join
+ List<String> fieldNames = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ for (int i = offset; i < reduceSinks.size(); i++) {
+ fieldNames.addAll(reduceSinkToFieldNames.get(reduceSinks.get(i)));
+ types.addAll(reduceSinkToTypes.get(reduceSinks.get(i)));
+ }
+
+ // get number of equality conjunctions in the final join condition
+ int size = reduceSinkDescs.get(0).getKeyCols().size();
+
+ // make up the join condition expression
+ List<ExprNodeDesc> joinConditionChildren = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < size; i++) {
+ // create a join key pair
+ List<ExprNodeDesc> keyPair = new ArrayList<ExprNodeDesc>();
+ for (ReduceSinkDesc sink : reduceSinkDescs) {
+ keyPair.add(sink.getKeyCols().get(i));
+ }
+ // create a hive equal condition
+ ExprNodeDesc equality = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
+ new GenericUDFOPEqual(), keyPair);
+ // add the equal condition to the conjunction list
+ joinConditionChildren.add(equality);
+ }
+ // get final conjunction expression
+ ExprNodeDesc conjunct = null;
+
+ if (joinConditionChildren.size() > 1)
+ conjunct = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(),
+ joinConditionChildren);
+ else if (joinConditionChildren.size() == 1)
+ conjunct = joinConditionChildren.get(0);
+ else {
+ // there is no join equality condition, full outer join
+ conjunct = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, new Boolean(true));
+ }
+ // get an ILogicalExpression from hive's expression
+ Mutable<ILogicalExpression> expression = t.translateScalarFucntion(conjunct);
+
+ Mutable<ILogicalOperator> leftBranch = new MutableObject<ILogicalOperator>(joinSrcOne);
+ Mutable<ILogicalOperator> rightBranch = new MutableObject<ILogicalOperator>(exchanges.get(offset));
+
+ // get the join operator
+ if (conds.get(offset).getType() == JoinDesc.LEFT_OUTER_JOIN) {
+ currentRoot = new LeftOuterJoinOperator(expression);
+ Mutable<ILogicalOperator> temp = leftBranch;
+ leftBranch = rightBranch;
+ rightBranch = temp;
+ } else if (conds.get(offset).getType() == JoinDesc.RIGHT_OUTER_JOIN) {
+ currentRoot = new LeftOuterJoinOperator(expression);
+ } else
+ currentRoot = new InnerJoinOperator(expression);
+
+ // set the inputs from Algebricks join operator
+ // add the current table
+ currentRoot.getInputs().add(leftBranch);
+ currentRoot.getInputs().add(rightBranch);
+
+ return currentRoot;
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LateralViewJoinVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LateralViewJoinVisitor.java
new file mode 100644
index 0000000..cc19364
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LateralViewJoinVisitor.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+/**
+ * The lateral view join operator is used for FROM src LATERAL VIEW udtf()...
+ * This operator was implemented with the following operator DAG in mind.
+ * For a query such as
+ * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS
+ * adid
+ * The top of the operator DAG will look similar to
+ * [Table Scan] | [Lateral View Forward] / \ [Select](*) [Select](adid_list) | |
+ * | [UDTF] (explode) \ / [Lateral View Join] | | [Select] (pageid, adid.*) |
+ * ....
+ * Rows from the table scan operator are first to a lateral view forward
+ * operator that just forwards the row and marks the start of a LV. The select
+ * operator on the left picks all the columns while the select operator on the
+ * right picks only the columns needed by the UDTF.
+ * The output of select in the left branch and output of the UDTF in the right
+ * branch are then sent to the lateral view join (LVJ). In most cases, the UDTF
+ * will generate > 1 row for every row received from the TS, while the left
+ * select operator will generate only one. For each row output from the TS, the
+ * LVJ outputs all possible rows that can be created by joining the row from the
+ * left select and one of the rows output from the UDTF.
+ * Additional lateral views can be supported by adding a similar DAG after the
+ * previous LVJ operator.
+ */
+
+@SuppressWarnings("rawtypes")
+public class LateralViewJoinVisitor extends DefaultVisitor {
+
+ private UDTFDesc udtf;
+
+ private List<Mutable<ILogicalOperator>> parents = new ArrayList<Mutable<ILogicalOperator>>();
+
+ @Override
+ public Mutable<ILogicalOperator> visit(LateralViewJoinOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
+
+ parents.add(AlgebricksParentOperatorRef);
+ if (operator.getParentOperators().size() > parents.size()) {
+ return null;
+ }
+
+ Operator parent0 = operator.getParentOperators().get(0);
+ ILogicalOperator parentOperator;
+ ILogicalExpression unnestArg;
+ if (parent0 instanceof UDTFOperator) {
+ List<LogicalVariable> unnestVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(parents.get(1).getValue(), unnestVars);
+ unnestArg = new VariableReferenceExpression(unnestVars.get(0));
+ parentOperator = parents.get(1).getValue();
+ } else {
+ List<LogicalVariable> unnestVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(parents.get(0).getValue(), unnestVars);
+ unnestArg = new VariableReferenceExpression(unnestVars.get(0));
+ parentOperator = parents.get(0).getValue();
+ }
+
+ LogicalVariable var = t.getVariable(udtf.toString(), TypeInfoFactory.unknownTypeInfo);
+
+ Mutable<ILogicalExpression> unnestExpr = t.translateUnnestFunction(udtf, new MutableObject<ILogicalExpression>(
+ unnestArg));
+ ILogicalOperator currentOperator = new UnnestOperator(var, unnestExpr);
+
+ List<LogicalVariable> outputVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(parentOperator, outputVars);
+ outputVars.add(var);
+ currentOperator.getInputs().add(new MutableObject<ILogicalOperator>(parentOperator));
+
+ parents.clear();
+ udtf = null;
+ t.rewriteOperatorOutputSchema(outputVars, operator);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(UDTFOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ Schema currentSchema = t.generateInputSchema(operator.getParentOperators().get(0));
+ udtf = (UDTFDesc) operator.getConf();
+
+ // populate the schema from upstream operator
+ operator.setSchema(operator.getParentOperators().get(0).getSchema());
+ List<LogicalVariable> latestOutputSchema = t.getVariablesFromSchema(currentSchema);
+ t.rewriteOperatorOutputSchema(latestOutputSchema, operator);
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LimitVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LimitVisitor.java
new file mode 100644
index 0000000..cc10f8f
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/LimitVisitor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.HivesterixConstantValue;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+
+public class LimitVisitor extends DefaultVisitor {
+
+ @Override
+ public Mutable<ILogicalOperator> visit(LimitOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ Schema currentSchema = t.generateInputSchema(operator.getParentOperators().get(0));
+
+ LimitDesc desc = (LimitDesc) operator.getConf();
+ int limit = desc.getLimit();
+ Integer limitValue = new Integer(limit);
+
+ ILogicalExpression expr = new ConstantExpression(new HivesterixConstantValue(limitValue));
+ ILogicalOperator currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator(
+ expr, true);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+
+ operator.setSchema(operator.getParentOperators().get(0).getSchema());
+ List<LogicalVariable> latestOutputSchema = t.getVariablesFromSchema(currentSchema);
+ t.rewriteOperatorOutputSchema(latestOutputSchema, operator);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/MapJoinVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/MapJoinVisitor.java
new file mode 100644
index 0000000..4aba6a4
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/MapJoinVisitor.java
@@ -0,0 +1,171 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+
+@SuppressWarnings("rawtypes")
+public class MapJoinVisitor extends DefaultVisitor {
+
+ /**
+ * map a join operator (in hive) to its parent operators (in asterix)
+ */
+ private HashMap<Operator, List<Mutable<ILogicalOperator>>> opMap = new HashMap<Operator, List<Mutable<ILogicalOperator>>>();
+
+ @Override
+ public Mutable<ILogicalOperator> visit(MapJoinOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) {
+ List<Operator<? extends Serializable>> joinSrc = operator.getParentOperators();
+ List<Mutable<ILogicalOperator>> parents = opMap.get(operator);
+ if (parents == null) {
+ parents = new ArrayList<Mutable<ILogicalOperator>>();
+ opMap.put(operator, parents);
+ }
+ parents.add(AlgebricksParentOperatorRef);
+ if (joinSrc.size() != parents.size())
+ return null;
+
+ ILogicalOperator currentOperator;
+ // make an map join operator
+ // TODO: will have trouble for n-way joins
+ MapJoinDesc joinDesc = (MapJoinDesc) operator.getConf();
+
+ Map<Byte, List<ExprNodeDesc>> keyMap = joinDesc.getKeys();
+ // get the projection expression (already re-written) from each source
+ // table
+ Map<Byte, List<ExprNodeDesc>> exprMap = joinDesc.getExprs();
+
+ int inputSize = operator.getParentOperators().size();
+ // get a list of reduce sink descs (input descs)
+
+ // get the parent operator
+ List<Mutable<ILogicalOperator>> parentOps = parents;
+
+ List<String> fieldNames = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ for (Operator ts : joinSrc) {
+ List<ColumnInfo> columns = ts.getSchema().getSignature();
+ for (ColumnInfo col : columns) {
+ fieldNames.add(col.getInternalName());
+ types.add(col.getType());
+ }
+ }
+
+ // get number of equality conjunctions in the final join condition
+ Set<Entry<Byte, List<ExprNodeDesc>>> keyEntries = keyMap.entrySet();
+ Iterator<Entry<Byte, List<ExprNodeDesc>>> entry = keyEntries.iterator();
+
+ int size = 0;
+ if (entry.hasNext())
+ size = entry.next().getValue().size();
+
+ // make up the join conditon expression
+ List<ExprNodeDesc> joinConditionChildren = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < size; i++) {
+ // create a join key pair
+ List<ExprNodeDesc> keyPair = new ArrayList<ExprNodeDesc>();
+ for (int j = 0; j < inputSize; j++) {
+ keyPair.add(keyMap.get(Byte.valueOf((byte) j)).get(i));
+ }
+ // create a hive equal condition
+ ExprNodeDesc equality = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,
+ new GenericUDFOPEqual(), keyPair);
+ // add the equal condition to the conjunction list
+ joinConditionChildren.add(equality);
+ }
+ // get final conjunction expression
+ ExprNodeDesc conjunct = null;
+
+ if (joinConditionChildren.size() > 1)
+ conjunct = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(),
+ joinConditionChildren);
+ else if (joinConditionChildren.size() == 1)
+ conjunct = joinConditionChildren.get(0);
+ else {
+ // there is no join equality condition, full outer join
+ conjunct = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, new Boolean(true));
+ }
+ // get an ILogicalExpression from hive's expression
+ Mutable<ILogicalExpression> expression = t.translateScalarFucntion(conjunct);
+
+ ArrayList<LogicalVariable> left = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> right = new ArrayList<LogicalVariable>();
+
+ Set<Entry<Byte, List<ExprNodeDesc>>> kentries = keyMap.entrySet();
+ Iterator<Entry<Byte, List<ExprNodeDesc>>> kiterator = kentries.iterator();
+ int iteration = 0;
+ ILogicalOperator assignOperator = null;
+ while (kiterator.hasNext()) {
+ List<ExprNodeDesc> outputExprs = kiterator.next().getValue();
+
+ if (iteration == 0)
+ assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, outputExprs, left);
+ else
+ assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, outputExprs, right);
+
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+ iteration++;
+ }
+
+ List<Mutable<ILogicalOperator>> inputs = parentOps;
+
+ // get the join operator
+ currentOperator = new InnerJoinOperator(expression);
+
+ // set the inputs from asterix join operator
+ for (Mutable<ILogicalOperator> input : inputs)
+ currentOperator.getInputs().add(input);
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+
+ // add assign and project operator
+ // output variables
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ Set<Entry<Byte, List<ExprNodeDesc>>> entries = exprMap.entrySet();
+ Iterator<Entry<Byte, List<ExprNodeDesc>>> iterator = entries.iterator();
+ while (iterator.hasNext()) {
+ List<ExprNodeDesc> outputExprs = iterator.next().getValue();
+ assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, outputExprs, variables);
+
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+ }
+
+ currentOperator = new ProjectOperator(variables);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+ t.rewriteOperatorOutputSchema(variables, operator);
+ // opMap.clear();
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ProjectVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ProjectVisitor.java
new file mode 100644
index 0000000..eb0922f
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/ProjectVisitor.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+
+public class ProjectVisitor extends DefaultVisitor {
+
+ /**
+ * translate project operator
+ */
+ @Override
+ public Mutable<ILogicalOperator> visit(SelectOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
+ Translator t) {
+
+ SelectDesc desc = (SelectDesc) operator.getConf();
+
+ if (desc == null)
+ return null;
+
+ List<ExprNodeDesc> cols = desc.getColList();
+
+ if (cols == null)
+ return null;
+
+ // insert assign operator if necessary
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+
+ for (ExprNodeDesc expr : cols)
+ t.rewriteExpression(expr);
+
+ ILogicalOperator assignOp = t.getAssignOperator(AlgebricksParentOperator, cols, variables);
+ ILogicalOperator currentOperator = null;
+ if (assignOp != null) {
+ currentOperator = assignOp;
+ AlgebricksParentOperator = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ currentOperator = new ProjectOperator(variables);
+ currentOperator.getInputs().add(AlgebricksParentOperator);
+ t.rewriteOperatorOutputSchema(variables, operator);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/SortVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/SortVisitor.java
new file mode 100644
index 0000000..325b632
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/SortVisitor.java
@@ -0,0 +1,113 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+
+public class SortVisitor extends DefaultVisitor {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Mutable<ILogicalOperator> visit(ReduceSinkOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException {
+ ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
+ Operator downStream = (Operator) operator.getChildOperators().get(0);
+ List<ExprNodeDesc> keys = desc.getKeyCols();
+ if (!(downStream instanceof ExtractOperator && desc.getNumReducers() == 1 && keys.size() > 0)) {
+ return null;
+ }
+
+ List<ExprNodeDesc> schema = new ArrayList<ExprNodeDesc>();
+ List<ExprNodeDesc> values = desc.getValueCols();
+ List<ExprNodeDesc> partitionCols = desc.getPartitionCols();
+ for (ExprNodeDesc key : keys) {
+ t.rewriteExpression(key);
+ }
+ for (ExprNodeDesc value : values) {
+ t.rewriteExpression(value);
+ }
+ for (ExprNodeDesc col : partitionCols) {
+ t.rewriteExpression(col);
+ }
+
+ // add a order-by operator and limit if any
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> pairs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+ char[] orders = desc.getOrder().toCharArray();
+ int i = 0;
+ for (ExprNodeDesc key : keys) {
+ Mutable<ILogicalExpression> expr = t.translateScalarFucntion(key);
+ IOrder order = orders[i] == '+' ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
+
+ Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(order, expr);
+ pairs.add(pair);
+ i++;
+ }
+
+ // get input variables
+ ArrayList<LogicalVariable> inputVariables = new ArrayList<LogicalVariable>();
+ VariableUtilities.getProducedVariables(AlgebricksParentOperatorRef.getValue(), inputVariables);
+
+ ArrayList<LogicalVariable> keyVariables = new ArrayList<LogicalVariable>();
+ ILogicalOperator currentOperator;
+ ILogicalOperator assignOp = t.getAssignOperator(AlgebricksParentOperatorRef, keys, keyVariables);
+ if (assignOp != null) {
+ currentOperator = assignOp;
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ OrderColumn[] keyColumns = new OrderColumn[keyVariables.size()];
+
+ for (int j = 0; j < keyColumns.length; j++)
+ keyColumns[j] = new OrderColumn(keyVariables.get(j), pairs.get(j).first.getKind());
+
+ // handle order operator
+ currentOperator = new OrderOperator(pairs);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+
+ // project back, remove generated sort-key columns if any
+ if (assignOp != null) {
+ currentOperator = new ProjectOperator(inputVariables);
+ currentOperator.getInputs().add(AlgebricksParentOperatorRef);
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ /**
+ * a special rule for hive's order by output schema of reduce sink
+ * operator only contains the columns
+ */
+ for (ExprNodeDesc value : values) {
+ schema.add(value);
+ }
+
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRef, schema, variables);
+ t.rewriteOperatorOutputSchema(variables, operator);
+
+ if (assignOperator != null) {
+ currentOperator = assignOperator;
+ AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
+ }
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/TableScanWriteVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/TableScanWriteVisitor.java
new file mode 100644
index 0000000..3af1832
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/TableScanWriteVisitor.java
@@ -0,0 +1,135 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveDataSink;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveDataSource;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveMetaDataProvider;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+public class TableScanWriteVisitor extends DefaultVisitor {
+
+ /**
+ * map from alias to partition desc
+ */
+ private HashMap<String, PartitionDesc> aliasToPathMap;
+
+ /**
+ * map from partition desc to data source
+ */
+ private HashMap<PartitionDesc, IDataSource<PartitionDesc>> dataSourceMap = new HashMap<PartitionDesc, IDataSource<PartitionDesc>>();
+
+ /**
+ * constructor
+ *
+ * @param aliasToPathMap
+ */
+ public TableScanWriteVisitor(HashMap<String, PartitionDesc> aliasToPathMap) {
+ this.aliasToPathMap = aliasToPathMap;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(TableScanOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ TableScanDesc desc = (TableScanDesc) operator.getConf();
+ if (desc == null) {
+ List<LogicalVariable> schema = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(AlgebricksParentOperator.getValue(), schema);
+ t.rewriteOperatorOutputSchema(schema, operator);
+ return null;
+ }
+
+ List<ColumnInfo> columns = operator.getSchema().getSignature();
+ for (int i = columns.size() - 1; i >= 0; i--)
+ if (columns.get(i).getIsVirtualCol() == true)
+ columns.remove(i);
+
+ // start with empty tuple operator
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ ArrayList<LogicalVariable> variables = new ArrayList<LogicalVariable>();
+ List<String> names = new ArrayList<String>();
+ for (ColumnInfo column : columns) {
+ types.add(column.getType());
+
+ LogicalVariable var = t.getVariableFromFieldName(column.getTabAlias() + "." + column.getInternalName());
+ LogicalVariable varNew;
+
+ if (var != null) {
+ varNew = t.getVariable(column.getTabAlias() + "." + column.getInternalName() + operator.toString(),
+ column.getType());
+ t.replaceVariable(var, varNew);
+ var = varNew;
+ } else
+ var = t.getNewVariable(column.getTabAlias() + "." + column.getInternalName(), column.getType());
+
+ variables.add(var);
+ names.add(column.getInternalName());
+ }
+ Schema currentSchema = new Schema(names, types);
+
+ String alias = desc.getAlias();
+ PartitionDesc partDesc = aliasToPathMap.get(alias);
+ IDataSource<PartitionDesc> dataSource = new HiveDataSource<PartitionDesc>(partDesc, currentSchema.getSchema());
+ ILogicalOperator currentOperator = new DataSourceScanOperator(variables, dataSource);
+
+ // set empty tuple source operator
+ ILogicalOperator ets = new EmptyTupleSourceOperator();
+ currentOperator.getInputs().add(new MutableObject<ILogicalOperator>(ets));
+
+ // setup data source
+ dataSourceMap.put(partDesc, dataSource);
+ t.rewriteOperatorOutputSchema(variables, operator);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(FileSinkOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) {
+
+ if (hiveOperator.getChildOperators() != null && hiveOperator.getChildOperators().size() > 0)
+ return null;
+
+ Schema currentSchema = t.generateInputSchema(hiveOperator.getParentOperators().get(0));
+
+ IDataSink sink = new HiveDataSink(hiveOperator, currentSchema.getSchema());
+ List<Mutable<ILogicalExpression>> exprList = new ArrayList<Mutable<ILogicalExpression>>();
+ for (String column : currentSchema.getNames()) {
+ exprList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(t.getVariable(column))));
+ }
+
+ ILogicalOperator currentOperator = new WriteOperator(exprList, sink);
+ if (AlgebricksParentOperator != null) {
+ currentOperator.getInputs().add(AlgebricksParentOperator);
+ }
+
+ IMetadataProvider<PartitionDesc, Object> metaData = new HiveMetaDataProvider<PartitionDesc, Object>(
+ hiveOperator, currentSchema, dataSourceMap);
+ t.setMetadataProvider(metaData);
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/UnionVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/UnionVisitor.java
new file mode 100644
index 0000000..96b9463
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/UnionVisitor.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.DefaultVisitor;
+import edu.uci.ics.hivesterix.logical.plan.visitor.base.Translator;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+public class UnionVisitor extends DefaultVisitor {
+
+ List<Mutable<ILogicalOperator>> parents = new ArrayList<Mutable<ILogicalOperator>>();
+
+ @Override
+ public Mutable<ILogicalOperator> visit(UnionOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
+ Translator t) throws AlgebricksException {
+
+ parents.add(AlgebricksParentOperator);
+ if (operator.getParentOperators().size() > parents.size()) {
+ return null;
+ }
+
+ List<LogicalVariable> leftVars = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> rightVars = new ArrayList<LogicalVariable>();
+
+ VariableUtilities.getUsedVariables(parents.get(0).getValue(), leftVars);
+ VariableUtilities.getUsedVariables(parents.get(1).getValue(), rightVars);
+
+ List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> triples = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
+ List<LogicalVariable> unionVars = new ArrayList<LogicalVariable>();
+
+ for (int i = 0; i < leftVars.size(); i++) {
+ LogicalVariable unionVar = t.getVariable(
+ leftVars.get(i).getId() + "union" + AlgebricksParentOperator.hashCode(),
+ TypeInfoFactory.unknownTypeInfo);
+ unionVars.add(unionVar);
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple = new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(
+ leftVars.get(i), rightVars.get(i), unionVar);
+ t.replaceVariable(leftVars.get(i), unionVar);
+ t.replaceVariable(rightVars.get(i), unionVar);
+ triples.add(triple);
+ }
+ ILogicalOperator currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator(
+ triples);
+ for (Mutable<ILogicalOperator> parent : parents)
+ currentOperator.getInputs().add(parent);
+
+ t.rewriteOperatorOutputSchema(unionVars, operator);
+ parents.clear();
+ return new MutableObject<ILogicalOperator>(currentOperator);
+ }
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/DefaultVisitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/DefaultVisitor.java
new file mode 100644
index 0000000..d298553
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/DefaultVisitor.java
@@ -0,0 +1,145 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor.base;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hadoop.hive.ql.exec.CollectOperator;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+/**
+ * a default empty implementation of visitor
+ *
+ * @author yingyib
+ */
+public class DefaultVisitor implements Visitor {
+
+ @Override
+ public Mutable<ILogicalOperator> visit(CollectOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(JoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(ExtractOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(MapJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(SMBMapJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ public Mutable<ILogicalOperator> visit(FileSinkOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ public Mutable<ILogicalOperator> visit(ReduceSinkOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(FilterOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(ForwardOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(GroupByOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(LateralViewForwardOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(LateralViewJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(LimitOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(MapOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(ScriptOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(SelectOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(TableScanOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperator, Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(UDTFOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
+ Translator t) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Mutable<ILogicalOperator> visit(UnionOperator operator, Mutable<ILogicalOperator> AlgebricksParentOperator,
+ Translator t) throws AlgebricksException {
+ return null;
+ }
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Translator.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Translator.java
new file mode 100644
index 0000000..8aa139a
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Translator.java
@@ -0,0 +1,170 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor.base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+@SuppressWarnings("rawtypes")
+public interface Translator {
+
+ /**
+ * generate input schema
+ *
+ * @param operator
+ * @return
+ */
+ public Schema generateInputSchema(Operator operator);
+
+ /**
+ * rewrite the names of output columns for feture expression evaluators to
+ * use
+ *
+ * @param operator
+ */
+ public void rewriteOperatorOutputSchema(List<LogicalVariable> vars, Operator operator);
+
+ /**
+ * rewrite the names of output columns for feture expression evaluators to
+ * use
+ *
+ * @param operator
+ */
+ public void rewriteOperatorOutputSchema(Operator operator);
+
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpression(ExprNodeDesc expr);
+
+ /**
+ * rewrite an expression and substitute variables
+ *
+ * @param expr
+ * hive expression
+ */
+ public void rewriteExpressionPartial(ExprNodeDesc expr);
+
+ /**
+ * get an assign operator as a child of parent
+ *
+ * @param parent
+ * @param cols
+ * @param variables
+ * @return
+ */
+ public ILogicalOperator getAssignOperator(Mutable<ILogicalOperator> parent, List<ExprNodeDesc> cols,
+ ArrayList<LogicalVariable> variables);
+
+ /**
+ * get type for a logical variable
+ *
+ * @param var
+ * @return type info
+ */
+ public TypeInfo getType(LogicalVariable var);
+
+ /**
+ * translate an expression from hive to Algebricks
+ *
+ * @param desc
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateScalarFucntion(ExprNodeDesc desc);
+
+ /**
+ * translate an aggregation from hive to Algebricks
+ *
+ * @param aggregateDesc
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateAggregation(AggregationDesc aggregateDesc);
+
+ /**
+ * translate unnesting (UDTF) function expression
+ *
+ * @param aggregator
+ * @return
+ */
+ public Mutable<ILogicalExpression> translateUnnestFunction(UDTFDesc udtfDesc, Mutable<ILogicalExpression> argument);
+
+ /**
+ * get variable from a schema
+ *
+ * @param schema
+ * @return
+ */
+ public List<LogicalVariable> getVariablesFromSchema(Schema schema);
+
+ /**
+ * get variable from name
+ *
+ * @param name
+ * @return
+ */
+ public LogicalVariable getVariable(String name);
+
+ /**
+ * get variable from field name
+ *
+ * @param name
+ * @return
+ */
+ public LogicalVariable getVariableFromFieldName(String name);
+
+ /**
+ * get variable from name, type
+ *
+ * @param fieldName
+ * @param type
+ * @return
+ */
+ public LogicalVariable getVariable(String fieldName, TypeInfo type);
+
+ /**
+ * get new variable from name, type
+ *
+ * @param fieldName
+ * @param type
+ * @return
+ */
+ public LogicalVariable getNewVariable(String fieldName, TypeInfo type);
+
+ /**
+ * set the metadata provider
+ *
+ * @param metadata
+ */
+ public void setMetadataProvider(IMetadataProvider<PartitionDesc, Object> metadata);
+
+ /**
+ * get the metadata provider
+ *
+ * @param metadata
+ */
+ public IMetadataProvider<PartitionDesc, Object> getMetadataProvider();
+
+ /**
+ * replace the variable
+ *
+ * @param oldVar
+ * @param newVar
+ */
+ public void replaceVariable(LogicalVariable oldVar, LogicalVariable newVar);
+
+}
diff --git a/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Visitor.java b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Visitor.java
new file mode 100644
index 0000000..11ae357
--- /dev/null
+++ b/hivesterix/hivesterix-translator/src/main/java/edu/uci/ics/hivesterix/logical/plan/visitor/base/Visitor.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.hivesterix.logical.plan.visitor.base;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hadoop.hive.ql.exec.CollectOperator;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+public interface Visitor {
+
+ public Mutable<ILogicalOperator> visit(CollectOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(JoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(ExtractOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(MapJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(SMBMapJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(FilterOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(ForwardOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(GroupByOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(LateralViewForwardOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(LateralViewJoinOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(LimitOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(MapOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(ScriptOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(SelectOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(TableScanOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(FileSinkOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(ReduceSinkOperator hiveOperator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(UDTFOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+
+ public Mutable<ILogicalOperator> visit(UnionOperator operator,
+ Mutable<ILogicalOperator> AlgebricksParentOperatorRef, Translator t) throws AlgebricksException;
+}