cross merge fullstack_release_candidate into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk/fullstack@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix-runtime/pom.xml b/hivesterix-runtime/pom.xml
new file mode 100644
index 0000000..24e8c11
--- /dev/null
+++ b/hivesterix-runtime/pom.xml
@@ -0,0 +1,371 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>edu.uci.ics.hyracks</groupId>
+	<artifactId>hivesterix-runtime</artifactId>
+	<version>0.2.3-SNAPSHOT</version>
+	<name>hivesterix-runtime</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>javax.servlet</groupId>
+			<artifactId>servlet-api</artifactId>
+			<version>2.5</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.8.1</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>args4j</groupId>
+			<artifactId>args4j</artifactId>
+			<version>2.0.12</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.json</groupId>
+			<artifactId>json</artifactId>
+			<version>20090211</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-server</artifactId>
+			<version>8.0.0.M1</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-servlet</artifactId>
+			<version>8.0.0.M1</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>jline</groupId>
+			<artifactId>jline</artifactId>
+			<version>0.9.94</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.datanucleus</groupId>
+			<artifactId>datanucleus-core</artifactId>
+			<version>2.0.3</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.datanucleus</groupId>
+			<artifactId>datanucleus-connectionpool</artifactId>
+			<version>2.0.3</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.datanucleus</groupId>
+			<artifactId>datanucleus-enhancer</artifactId>
+			<version>2.0.3</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.datanucleus</groupId>
+			<artifactId>datanucleus-rdbms</artifactId>
+			<version>2.0.3</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-dbcp</groupId>
+			<artifactId>commons-dbcp</artifactId>
+			<version>1.4</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-pool</groupId>
+			<artifactId>commons-pool</artifactId>
+			<version>1.5.4</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-collections</groupId>
+			<artifactId>commons-collections</artifactId>
+			<version>3.2.1</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-lang</groupId>
+			<artifactId>commons-lang</artifactId>
+			<version>2.4</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>javax</groupId>
+			<artifactId>jdo2-api</artifactId>
+			<version>2.3-ec</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.facebook</groupId>
+			<artifactId>libfb303</artifactId>
+			<version>0.5.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.thrift</groupId>
+			<artifactId>libthrift</artifactId>
+			<version>0.5.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>cli</artifactId>
+			<version>1.2</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.15</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.antlr</groupId>
+			<artifactId>antlr-runtime</artifactId>
+			<version>3.0.1</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-cli</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-common</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-exec</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-hwi</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-jdbc</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-metastore</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-service</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-shims</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop.hive</groupId>
+			<artifactId>hive-serde</artifactId>
+			<version>0.7.0</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>1.6.1</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>1.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.6.1</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>commons-logging</groupId>
+			<artifactId>commons-logging</artifactId>
+			<version>1.1.1</version>
+			<type>jar</type>
+			<classifier>api</classifier>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>r06</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.antlr</groupId>
+			<artifactId>stringtemplate</artifactId>
+			<version>3.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.8.1.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase</artifactId>
+			<version>0.90.3</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>algebricks-compiler</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-cc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-control-nc</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hivesterix-serde</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hivesterix-common</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+					<encoding>UTF-8</encoding>
+					<fork>true</fork>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>patch</id>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<phase>package</phase>
+						<configuration>
+							<classifier>patch</classifier>
+							<finalName>a-hive-rumtime</finalName>
+							<includes>
+								<include>**/org/apache/**</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	<repositories>
+		<repository>
+			<releases>
+				<enabled>true</enabled>
+				<updatePolicy>always</updatePolicy>
+				<checksumPolicy>warn</checksumPolicy>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+				<updatePolicy>always</updatePolicy>
+				<checksumPolicy>fail</checksumPolicy>
+			</snapshots>
+			<id>third-party</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/repositories/third-party</url>
+		</repository>
+		<repository>
+			<releases>
+				<enabled>true</enabled>
+				<updatePolicy>always</updatePolicy>
+				<checksumPolicy>warn</checksumPolicy>
+			</releases>
+			<snapshots>
+				<enabled>true</enabled>
+				<updatePolicy>always</updatePolicy>
+				<checksumPolicy>fail</checksumPolicy>
+			</snapshots>
+			<id>hyracks-public-release</id>
+			<url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-public-releases</url>
+		</repository>
+	</repositories>
+</project>
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java
new file mode 100644
index 0000000..ad02239
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java
@@ -0,0 +1,169 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import java.io.DataOutput;

+import java.io.IOException;

+import java.util.ArrayList;

+import java.util.List;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;

+import org.apache.hadoop.hive.ql.metadata.HiveException;

+import org.apache.hadoop.hive.serde2.SerDe;

+import org.apache.hadoop.hive.serde2.SerDeException;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.io.BytesWritable;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;

+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;

+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;

+

+public abstract class AbstractExpressionEvaluator implements ICopyEvaluator {

+

+    private List<ICopyEvaluator> children;

+

+    private ExprNodeEvaluator evaluator;

+

+    private IDataOutputProvider out;

+

+    private ObjectInspector inspector;

+

+    /**

+     * output object inspector

+     */

+    private ObjectInspector outputInspector;

+

+    /**

+     * cached row object

+     */

+    private LazyObject<? extends ObjectInspector> cachedRowObject;

+

+    /**

+     * serializer/derialzer for lazy object

+     */

+    private SerDe lazySer;

+

+    /**

+     * data output

+     */

+    DataOutput dataOutput;

+

+    public AbstractExpressionEvaluator(ExprNodeEvaluator hiveEvaluator, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        evaluator = hiveEvaluator;

+        out = output;

+        inspector = oi;

+        dataOutput = out.getDataOutput();

+    }

+

+    protected ObjectInspector getRowInspector() {

+        return null;

+    }

+

+    protected IDataOutputProvider getIDataOutputProvider() {

+        return out;

+    }

+

+    protected ExprNodeEvaluator getHiveEvaluator() {

+        return evaluator;

+    }

+

+    public ObjectInspector getObjectInspector() {

+        return inspector;

+    }

+

+    @Override

+    public void evaluate(IFrameTupleReference r) throws AlgebricksException {

+        // initialize hive evaluator

+        try {

+            if (outputInspector == null)

+                outputInspector = evaluator.initialize(inspector);

+        } catch (Exception e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e.getMessage());

+        }

+

+        readIntoCache(r);

+        try {

+            Object result = evaluator.evaluate(cachedRowObject);

+

+            // if (result == null) {

+            // result = evaluator.evaluate(cachedRowObject);

+            //

+            // // check if result is null

+            //

+            // String errorMsg = "serialize null object in  \n output " +

+            // outputInspector.toString() + " \n input "

+            // + inspector.toString() + "\n ";

+            // errorMsg += "";

+            // List<Object> columns = ((StructObjectInspector)

+            // inspector).getStructFieldsDataAsList(cachedRowObject);

+            // for (Object column : columns) {

+            // errorMsg += column.toString() + " ";

+            // }

+            // errorMsg += "\n";

+            // Log.info(errorMsg);

+            // System.out.println(errorMsg);

+            // // result = new BooleanWritable(true);

+            // throw new IllegalStateException(errorMsg);

+            // }

+

+            serializeResult(result);

+        } catch (HiveException e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e.getMessage());

+        } catch (IOException e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e.getMessage());

+        }

+    }

+

+    /**

+     * serialize the result

+     * 

+     * @param result

+     *            the evaluation result

+     * @throws IOException

+     * @throws AlgebricksException

+     */

+    private void serializeResult(Object result) throws IOException, AlgebricksException {

+        if (lazySer == null)

+            lazySer = new LazySerDe();

+

+        try {

+            BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, outputInspector);

+            dataOutput.write(outputWritable.getBytes(), 0, outputWritable.getLength());

+        } catch (SerDeException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    /**

+     * bind the tuple reference to the cached row object

+     * 

+     * @param r

+     */

+    private void readIntoCache(IFrameTupleReference r) {

+        if (cachedRowObject == null)

+            cachedRowObject = (LazyObject<? extends ObjectInspector>) LazyFactory.createLazyObject(inspector);

+        cachedRowObject.init(r);

+    }

+

+    /**

+     * set a list of children of this evaluator

+     * 

+     * @param children

+     */

+    public void setChildren(List<ICopyEvaluator> children) {

+        this.children = children;

+    }

+

+    public void addChild(ICopyEvaluator child) {

+        if (children == null)

+            children = new ArrayList<ICopyEvaluator>();

+        children.add(child);

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java
new file mode 100644
index 0000000..e500376
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java
@@ -0,0 +1,224 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import java.io.DataOutput;

+import java.io.IOException;

+import java.util.List;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;

+import org.apache.hadoop.hive.ql.metadata.HiveException;

+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;

+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;

+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;

+import org.apache.hadoop.hive.serde2.SerDe;

+import org.apache.hadoop.hive.serde2.SerDeException;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

+import org.apache.hadoop.io.BytesWritable;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;

+

+public class AggregationFunctionEvaluator implements ICopyAggregateFunction {

+

+    /**

+     * the mode of aggregation function

+     */

+    private GenericUDAFEvaluator.Mode mode;

+

+    /**

+     * an array of evaluators

+     */

+    private ExprNodeEvaluator[] evaluators;

+

+    /**

+     * udaf evaluator partial

+     */

+    private GenericUDAFEvaluator udafPartial;

+

+    /**

+     * udaf evaluator complete

+     */

+    private GenericUDAFEvaluator udafComplete;

+

+    /**

+     * cached parameter objects

+     */

+    private Object[] cachedParameters;

+

+    /**

+     * cached row objects

+     */

+    private LazyObject<? extends ObjectInspector> cachedRowObject;

+

+    /**

+     * the output channel

+     */

+    private DataOutput out;

+

+    /**

+     * aggregation buffer

+     */

+    private AggregationBuffer aggBuffer;

+

+    /**

+     * we only use lazy serde to do serialization

+     */

+    private SerDe lazySer;

+

+    /**

+     * the output object inspector for this aggregation function

+     */

+    private ObjectInspector outputInspector;

+

+    /**

+     * the output object inspector for this aggregation function

+     */

+    private ObjectInspector outputInspectorPartial;

+

+    /**

+     * parameter inspectors

+     */

+    private ObjectInspector[] parameterInspectors;

+

+    /**

+     * output make sure the aggregation functio has least object creation

+     * 

+     * @param desc

+     * @param oi

+     * @param output

+     */

+    public AggregationFunctionEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes, String genericUDAFName,

+            GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi, DataOutput output,

+            ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde,

+            LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial,

+            GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial) {

+        // shared object across threads

+        this.out = output;

+        this.mode = aggMode;

+        this.parameterInspectors = pInspectors;

+

+        // thread local objects

+        this.evaluators = evals;

+        this.cachedParameters = parameterCache;

+        this.cachedRowObject = row;

+        this.lazySer = serde;

+        this.udafPartial = udafunctionPartial;

+        this.udafComplete = udafunctionComplete;

+        this.outputInspector = outputOi;

+        this.outputInspectorPartial = outputOiPartial;

+    }

+

+    @Override

+    public void init() throws AlgebricksException {

+        try {

+            aggBuffer = udafPartial.getNewAggregationBuffer();

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    @Override

+    public void step(IFrameTupleReference tuple) throws AlgebricksException {

+        readIntoCache(tuple);

+        processRow();

+    }

+

+    private void processRow() throws AlgebricksException {

+        try {

+            // get values by evaluating them

+            for (int i = 0; i < cachedParameters.length; i++) {

+                cachedParameters[i] = evaluators[i].evaluate(cachedRowObject);

+            }

+            processAggregate();

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    private void processAggregate() throws HiveException {

+        /**

+         * accumulate the aggregation function

+         */

+        switch (mode) {

+            case PARTIAL1:

+            case COMPLETE:

+                udafPartial.iterate(aggBuffer, cachedParameters);

+                break;

+            case PARTIAL2:

+            case FINAL:

+                if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) {

+                    Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0])

+                            .getPrimitiveWritableObject(cachedParameters[0]);

+                    udafPartial.merge(aggBuffer, parameter);

+                } else

+                    udafPartial.merge(aggBuffer, cachedParameters[0]);

+                break;

+            default:

+                break;

+        }

+    }

+

+    /**

+     * serialize the result

+     * 

+     * @param result

+     *            the evaluation result

+     * @throws IOException

+     * @throws AlgebricksException

+     */

+    private void serializeResult(Object result, ObjectInspector oi) throws IOException, AlgebricksException {

+        try {

+            BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi);

+            out.write(outputWritable.getBytes(), 0, outputWritable.getLength());

+        } catch (SerDeException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    /**

+     * bind the tuple reference to the cached row object

+     * 

+     * @param r

+     */

+    private void readIntoCache(IFrameTupleReference r) {

+        cachedRowObject.init(r);

+    }

+

+    @Override

+    public void finish() throws AlgebricksException {

+        // aggregator

+        try {

+            Object result = null;

+            result = udafPartial.terminatePartial(aggBuffer);

+            if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) {

+                result = udafComplete.terminate(aggBuffer);

+                serializeResult(result, outputInspector);

+            } else {

+                serializeResult(result, outputInspectorPartial);

+            }

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        } catch (IOException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    @Override

+    public void finishPartial() throws AlgebricksException {

+        // aggregator.

+        try {

+            Object result = null;

+            // get aggregations

+            result = udafPartial.terminatePartial(aggBuffer);

+            serializeResult(result, outputInspectorPartial);

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        } catch (IOException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java
new file mode 100644
index 0000000..1933253
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java
@@ -0,0 +1,248 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AggregatuibFunctionSerializableEvaluator implements ICopySerializableAggregateFunction {
+
+    /**
+     * the mode of aggregation function
+     */
+    private GenericUDAFEvaluator.Mode mode;
+
+    /**
+     * an array of evaluators
+     */
+    private ExprNodeEvaluator[] evaluators;
+
+    /**
+     * udaf evaluator partial
+     */
+    private GenericUDAFEvaluator udafPartial;
+
+    /**
+     * udaf evaluator complete
+     */
+    private GenericUDAFEvaluator udafComplete;
+
+    /**
+     * cached parameter objects
+     */
+    private Object[] cachedParameters;
+
+    /**
+     * cached row objects
+     */
+    private LazyObject<? extends ObjectInspector> cachedRowObject;
+
+    /**
+     * aggregation buffer
+     */
+    private SerializableBuffer aggBuffer;
+
+    /**
+     * we only use lazy serde to do serialization
+     */
+    private SerDe lazySer;
+
+    /**
+     * the output object inspector for this aggregation function
+     */
+    private ObjectInspector outputInspector;
+
+    /**
+     * the output object inspector for this aggregation function
+     */
+    private ObjectInspector outputInspectorPartial;
+
+    /**
+     * parameter inspectors
+     */
+    private ObjectInspector[] parameterInspectors;
+
+    /**
+     * output make sure the aggregation functio has least object creation
+     * 
+     * @param desc
+     * @param oi
+     * @param output
+     */
+    public AggregatuibFunctionSerializableEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes,
+            String genericUDAFName, GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi,
+            ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde,
+            LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial,
+            GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial)
+            throws AlgebricksException {
+        // shared object across threads
+        this.mode = aggMode;
+        this.parameterInspectors = pInspectors;
+
+        // thread local objects
+        this.evaluators = evals;
+        this.cachedParameters = parameterCache;
+        this.cachedRowObject = row;
+        this.lazySer = serde;
+        this.udafPartial = udafunctionPartial;
+        this.udafComplete = udafunctionComplete;
+        this.outputInspector = outputOi;
+        this.outputInspectorPartial = outputOiPartial;
+
+        try {
+            aggBuffer = (SerializableBuffer) udafPartial.getNewAggregationBuffer();
+        } catch (HiveException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void init(DataOutput output) throws AlgebricksException {
+        try {
+            udafPartial.reset(aggBuffer);
+            outputAggBuffer(aggBuffer, output);
+        } catch (HiveException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException {
+        deSerializeAggBuffer(aggBuffer, data, start, len);
+        readIntoCache(tuple);
+        processRow();
+        serializeAggBuffer(aggBuffer, data, start, len);
+    }
+
+    private void processRow() throws AlgebricksException {
+        try {
+            // get values by evaluating them
+            for (int i = 0; i < cachedParameters.length; i++) {
+                cachedParameters[i] = evaluators[i].evaluate(cachedRowObject);
+            }
+            processAggregate();
+        } catch (HiveException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    private void processAggregate() throws HiveException {
+        /**
+         * accumulate the aggregation function
+         */
+        switch (mode) {
+            case PARTIAL1:
+            case COMPLETE:
+                udafPartial.iterate(aggBuffer, cachedParameters);
+                break;
+            case PARTIAL2:
+            case FINAL:
+                if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) {
+                    Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0])
+                            .getPrimitiveWritableObject(cachedParameters[0]);
+                    udafPartial.merge(aggBuffer, parameter);
+                } else
+                    udafPartial.merge(aggBuffer, cachedParameters[0]);
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * serialize the result
+     * 
+     * @param result
+     *            the evaluation result
+     * @throws IOException
+     * @throws AlgebricksException
+     */
+    private void serializeResult(Object result, ObjectInspector oi, DataOutput out) throws IOException,
+            AlgebricksException {
+        try {
+            BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi);
+            out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
+        } catch (SerDeException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    /**
+     * bind the tuple reference to the cached row object
+     * 
+     * @param r
+     */
+    private void readIntoCache(IFrameTupleReference r) {
+        cachedRowObject.init(r);
+    }
+
+    @Override
+    public void finish(byte[] data, int start, int len, DataOutput output) throws AlgebricksException {
+        deSerializeAggBuffer(aggBuffer, data, start, len);
+        // aggregator
+        try {
+            Object result = null;
+            result = udafPartial.terminatePartial(aggBuffer);
+            if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) {
+                result = udafComplete.terminate(aggBuffer);
+                serializeResult(result, outputInspector, output);
+            } else {
+                serializeResult(result, outputInspectorPartial, output);
+            }
+        } catch (HiveException e) {
+            throw new AlgebricksException(e);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial(byte[] data, int start, int len, DataOutput output) throws AlgebricksException {
+        deSerializeAggBuffer(aggBuffer, data, start, len);
+        // aggregator.
+        try {
+            Object result = null;
+            // get aggregations
+            result = udafPartial.terminatePartial(aggBuffer);
+            serializeResult(result, outputInspectorPartial, output);
+        } catch (HiveException e) {
+            throw new AlgebricksException(e);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    private void serializeAggBuffer(SerializableBuffer buffer, byte[] data, int start, int len)
+            throws AlgebricksException {
+        buffer.serializeAggBuffer(data, start, len);
+    }
+
+    private void deSerializeAggBuffer(SerializableBuffer buffer, byte[] data, int start, int len)
+            throws AlgebricksException {
+        buffer.deSerializeAggBuffer(data, start, len);
+    }
+
+    private void outputAggBuffer(SerializableBuffer buffer, DataOutput out) throws AlgebricksException {
+        try {
+            buffer.serializeAggBuffer(out);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java
new file mode 100644
index 0000000..96065e5
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java
@@ -0,0 +1,67 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+public class BufferSerDeUtil {
+
+    public static double getDouble(byte[] bytes, int offset) {
+        return Double.longBitsToDouble(getLong(bytes, offset));
+    }
+
+    public static float getFloat(byte[] bytes, int offset) {
+        return Float.intBitsToFloat(getInt(bytes, offset));
+    }
+
+    public static boolean getBoolean(byte[] bytes, int offset) {
+        if (bytes[offset] == 0)
+            return false;
+        else
+            return true;
+    }
+
+    public static int getInt(byte[] bytes, int offset) {
+        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+                + ((bytes[offset + 3] & 0xff) << 0);
+    }
+
+    public static long getLong(byte[] bytes, int offset) {
+        return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+                + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+                + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+    }
+
+    public static void writeBoolean(boolean value, byte[] bytes, int offset) {
+        if (value)
+            bytes[offset] = (byte) 1;
+        else
+            bytes[offset] = (byte) 0;
+    }
+
+    public static void writeInt(int value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeLong(long value, byte[] bytes, int offset) {
+        bytes[offset++] = (byte) (value >> 56);
+        bytes[offset++] = (byte) (value >> 48);
+        bytes[offset++] = (byte) (value >> 40);
+        bytes[offset++] = (byte) (value >> 32);
+        bytes[offset++] = (byte) (value >> 24);
+        bytes[offset++] = (byte) (value >> 16);
+        bytes[offset++] = (byte) (value >> 8);
+        bytes[offset++] = (byte) (value);
+    }
+
+    public static void writeDouble(double value, byte[] bytes, int offset) {
+        long lValue = Double.doubleToLongBits(value);
+        writeLong(lValue, bytes, offset);
+    }
+
+    public static void writeFloat(float value, byte[] bytes, int offset) {
+        int iValue = Float.floatToIntBits(value);
+        writeInt(iValue, bytes, offset);
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java
new file mode 100644
index 0000000..5647f6a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator;

+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class ColumnExpressionEvaluator extends AbstractExpressionEvaluator {

+

+    public ColumnExpressionEvaluator(ExprNodeColumnDesc expr, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        super(new ExprNodeColumnEvaluator(expr), oi, output);

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java
new file mode 100644
index 0000000..d8796ea
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;

+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class ConstantExpressionEvaluator extends AbstractExpressionEvaluator {

+

+    public ConstantExpressionEvaluator(ExprNodeConstantDesc expr, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        super(new ExprNodeConstantEvaluator(expr), oi, output);

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java
new file mode 100644
index 0000000..35560b6
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeFieldEvaluator;

+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class FieldExpressionEvaluator extends AbstractExpressionEvaluator {

+

+    public FieldExpressionEvaluator(ExprNodeFieldDesc expr, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        super(new ExprNodeFieldEvaluator(expr), oi, output);

+    }

+

+}
\ No newline at end of file
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java
new file mode 100644
index 0000000..7ffec7a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator;

+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class FunctionExpressionEvaluator extends AbstractExpressionEvaluator {

+

+    public FunctionExpressionEvaluator(ExprNodeGenericFuncDesc expr, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        super(new ExprNodeGenericFuncEvaluator(expr), oi, output);

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java
new file mode 100644
index 0000000..ca60385
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import org.apache.hadoop.hive.ql.exec.ExprNodeNullEvaluator;

+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class NullExpressionEvaluator extends AbstractExpressionEvaluator {

+

+    public NullExpressionEvaluator(ExprNodeNullDesc expr, ObjectInspector oi, IDataOutputProvider output)

+            throws AlgebricksException {

+        super(new ExprNodeNullEvaluator(expr), oi, output);

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java
new file mode 100644
index 0000000..676989e
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public interface SerializableBuffer extends AggregationBuffer {
+
+    public void deSerializeAggBuffer(byte[] data, int start, int len);
+
+    public void serializeAggBuffer(byte[] data, int start, int len);
+
+    public void serializeAggBuffer(DataOutput output) throws IOException;
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java
new file mode 100644
index 0000000..2e78663
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;

+

+import java.io.DataOutput;

+import java.io.IOException;

+

+import org.apache.hadoop.hive.ql.metadata.HiveException;

+import org.apache.hadoop.hive.ql.plan.UDTFDesc;

+import org.apache.hadoop.hive.ql.udf.generic.Collector;

+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

+import org.apache.hadoop.hive.serde2.SerDe;

+import org.apache.hadoop.hive.serde2.SerDeException;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.io.BytesWritable;

+

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hivesterix.serde.lazy.LazyColumnar;

+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;

+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;

+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;

+

+public class UDTFFunctionEvaluator implements ICopyUnnestingFunction, Collector {

+

+    /**

+     * udtf function

+     */

+    private UDTFDesc func;

+

+    /**

+     * input object inspector

+     */

+    private ObjectInspector inputInspector;

+

+    /**

+     * output object inspector

+     */

+    private ObjectInspector outputInspector;

+

+    /**

+     * object inspector for udtf

+     */

+    private ObjectInspector[] udtfInputOIs;

+

+    /**

+     * generic udtf

+     */

+    private GenericUDTF udtf;

+

+    /**

+     * data output

+     */

+    private DataOutput out;

+

+    /**

+     * the input row object

+     */

+    private LazyColumnar cachedRowObject;

+

+    /**

+     * cached row object (input)

+     */

+    private Object[] cachedInputObjects;

+

+    /**

+     * serialization/deserialization

+     */

+    private SerDe lazySerDe;

+

+    /**

+     * columns feed into UDTF

+     */

+    private int[] columns;

+

+    public UDTFFunctionEvaluator(UDTFDesc desc, Schema schema, int[] cols, DataOutput output) {

+        this.func = desc;

+        this.inputInspector = schema.toObjectInspector();

+        udtf = func.getGenericUDTF();

+        out = output;

+        columns = cols;

+    }

+

+    @Override

+    public void init(IFrameTupleReference tuple) throws AlgebricksException {

+        cachedInputObjects = new LazyObject[columns.length];

+        try {

+            cachedRowObject = (LazyColumnar) LazyFactory.createLazyObject(inputInspector);

+            outputInspector = udtf.initialize(udtfInputOIs);

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        }

+        udtf.setCollector(this);

+        lazySerDe = new LazySerDe();

+        readIntoCache(tuple);

+    }

+

+    @Override

+    public boolean step() throws AlgebricksException {

+        try {

+            udtf.process(cachedInputObjects);

+            return true;

+        } catch (HiveException e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    /**

+     * bind the tuple reference to the cached row object

+     * 

+     * @param r

+     */

+    private void readIntoCache(IFrameTupleReference r) {

+        cachedRowObject.init(r);

+        for (int i = 0; i < cachedInputObjects.length; i++) {

+            cachedInputObjects[i] = cachedRowObject.getField(columns[i]);

+        }

+    }

+

+    /**

+     * serialize the result

+     * 

+     * @param result

+     *            the evaluation result

+     * @throws IOException

+     * @throws AlgebricksException

+     */

+    private void serializeResult(Object result) throws SerDeException, IOException {

+        BytesWritable outputWritable = (BytesWritable) lazySerDe.serialize(result, outputInspector);

+        out.write(outputWritable.getBytes(), 0, outputWritable.getLength());

+    }

+

+    @Override

+    public void collect(Object input) throws HiveException {

+        try {

+            serializeResult(input);

+        } catch (IOException e) {

+            throw new HiveException(e);

+        } catch (SerDeException e) {

+            throw new HiveException(e);

+        }

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..f3b76e4
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveByteBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveByteBinaryAscComparatorFactory INSTANCE = new HiveByteBinaryAscComparatorFactory();

+

+    private HiveByteBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private byte left;

+            private byte right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = b1[s1];

+                right = b2[s2];

+                if (left > right)

+                    return 1;

+                else if (left == right)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..8d452dc
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveByteBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveByteBinaryDescComparatorFactory INSTANCE = new HiveByteBinaryDescComparatorFactory();

+

+    private HiveByteBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private byte left;

+            private byte right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = b1[s1];

+                right = b2[s2];

+                if (left > right)

+                    return -1;

+                else if (left == right)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..0b5350a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveDoubleBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveDoubleBinaryAscComparatorFactory INSTANCE = new HiveDoubleBinaryAscComparatorFactory();

+

+    private HiveDoubleBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private double left;

+            private double right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b1, s1));

+                right = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b2, s2));

+                if (left > right)

+                    return 1;

+                else if (left == right)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..2405956
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveDoubleBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveDoubleBinaryDescComparatorFactory INSTANCE = new HiveDoubleBinaryDescComparatorFactory();

+

+    private HiveDoubleBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private double left;

+            private double right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b1, s1));

+                right = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b2, s2));

+                if (left > right)

+                    return -1;

+                else if (left == right)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..05a43e6
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveFloatBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveFloatBinaryAscComparatorFactory INSTANCE = new HiveFloatBinaryAscComparatorFactory();

+

+    private HiveFloatBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private float left;

+            private float right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b1, s1));

+                right = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b2, s2));

+                if (left > right)

+                    return 1;

+                else if (left == right)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..2c44f97
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveFloatBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveFloatBinaryDescComparatorFactory INSTANCE = new HiveFloatBinaryDescComparatorFactory();

+

+    private HiveFloatBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private float left;

+            private float right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b1, s1));

+                right = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b2, s2));

+                if (left > right)

+                    return -1;

+                else if (left == right)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..0127791
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveIntegerBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static final HiveIntegerBinaryAscComparatorFactory INSTANCE = new HiveIntegerBinaryAscComparatorFactory();

+

+    private HiveIntegerBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VInt left = new VInt();

+            private VInt right = new VInt();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVInt(b1, s1, left);

+                LazyUtils.readVInt(b2, s2, right);

+

+                if (left.length != l1 || right.length != l2)

+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "

+                            + left.length + "," + right.length + " expected " + l1 + "," + l2);

+

+                if (left.value > right.value)

+                    return 1;

+                else if (left.value == right.value)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..5116337
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveIntegerBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static final HiveIntegerBinaryDescComparatorFactory INSTANCE = new HiveIntegerBinaryDescComparatorFactory();

+

+    private HiveIntegerBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VInt left = new VInt();

+            private VInt right = new VInt();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVInt(b1, s1, left);

+                LazyUtils.readVInt(b2, s2, right);

+                if (left.length != l1 || right.length != l2)

+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "

+                            + left.length + " expected " + l1);

+                if (left.value > right.value)

+                    return -1;

+                else if (left.value == right.value)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..fa416a9
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveLongBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static final HiveLongBinaryAscComparatorFactory INSTANCE = new HiveLongBinaryAscComparatorFactory();

+

+    private HiveLongBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VLong left = new VLong();

+            private VLong right = new VLong();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVLong(b1, s1, left);

+                LazyUtils.readVLong(b2, s2, right);

+                if (left.length != l1 || right.length != l2)

+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "

+                            + left.length + " expected " + l1);

+                if (left.value > right.value)

+                    return 1;

+                else if (left.value == right.value)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..e72dc62
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveLongBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static final HiveLongBinaryDescComparatorFactory INSTANCE = new HiveLongBinaryDescComparatorFactory();

+

+    private HiveLongBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VLong left = new VLong();

+            private VLong right = new VLong();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVLong(b1, s1, left);

+                LazyUtils.readVLong(b2, s2, right);

+                if (left.length != l1 || right.length != l2)

+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "

+                            + left.length + " expected " + l1);

+                if (left.value > right.value)

+                    return -1;

+                else if (left.value == right.value)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..a3745fa
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveShortBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveShortBinaryAscComparatorFactory INSTANCE = new HiveShortBinaryAscComparatorFactory();

+

+    private HiveShortBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private short left;

+            private short right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = LazyUtils.byteArrayToShort(b1, s1);

+                right = LazyUtils.byteArrayToShort(b2, s2);

+                if (left > right)

+                    return 1;

+                else if (left == right)

+                    return 0;

+                else

+                    return -1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..44d3f43
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveShortBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveShortBinaryDescComparatorFactory INSTANCE = new HiveShortBinaryDescComparatorFactory();

+

+    private HiveShortBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private short left;

+            private short right;

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                left = LazyUtils.byteArrayToShort(b1, s1);

+                right = LazyUtils.byteArrayToShort(b2, s2);

+                if (left > right)

+                    return -1;

+                else if (left == right)

+                    return 0;

+                else

+                    return 1;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..6da9716
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import org.apache.hadoop.io.Text;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveStringBinaryAscComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveStringBinaryAscComparatorFactory INSTANCE = new HiveStringBinaryAscComparatorFactory();

+

+    private HiveStringBinaryAscComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VInt leftLen = new VInt();

+            private VInt rightLen = new VInt();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVInt(b1, s1, leftLen);

+                LazyUtils.readVInt(b2, s2, rightLen);

+

+                if (leftLen.value + leftLen.length != l1 || rightLen.value + rightLen.length != l2)

+                    throw new IllegalStateException("parse string: length mismatch, expected "

+                            + (leftLen.value + leftLen.length) + ", " + (rightLen.value + rightLen.length)

+                            + " but get " + l1 + ", " + l2);

+

+                return Text.Comparator.compareBytes(b1, s1 + leftLen.length, l1 - leftLen.length, b2, s2

+                        + rightLen.length, l2 - rightLen.length);

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..c579711
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;

+

+import org.apache.hadoop.io.WritableComparator;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveStringBinaryDescComparatorFactory implements IBinaryComparatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveStringBinaryDescComparatorFactory INSTANCE = new HiveStringBinaryDescComparatorFactory();

+

+    private HiveStringBinaryDescComparatorFactory() {

+    }

+

+    @Override

+    public IBinaryComparator createBinaryComparator() {

+        return new IBinaryComparator() {

+            private VInt leftLen = new VInt();

+            private VInt rightLen = new VInt();

+

+            @Override

+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+                LazyUtils.readVInt(b1, s1, leftLen);

+                LazyUtils.readVInt(b2, s2, rightLen);

+

+                if (leftLen.value + leftLen.length != l1 || rightLen.value + rightLen.length != l2)

+                    throw new IllegalStateException("parse string: length mismatch, expected "

+                            + (leftLen.value + leftLen.length) + ", " + (rightLen.value + rightLen.length)

+                            + " but get " + l1 + ", " + l2);

+

+                return -WritableComparator.compareBytes(b1, s1 + leftLen.length, l1 - leftLen.length, b2, s2

+                        + rightLen.length, l2 - rightLen.length);

+            }

+        };

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java
new file mode 100644
index 0000000..d664341
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java
@@ -0,0 +1,367 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import java.util.ArrayList;

+import java.util.HashMap;

+import java.util.List;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;

+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;

+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;

+import org.apache.hadoop.hive.ql.exec.Utilities;

+import org.apache.hadoop.hive.ql.metadata.HiveException;

+import org.apache.hadoop.hive.ql.plan.AggregationDesc;

+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;

+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

+import org.apache.hadoop.hive.serde2.SerDe;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.AggregationFunctionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;

+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;

+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class AggregationFunctionFactory implements ICopyAggregateFunctionFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    /**

+     * list of parameters' serialization

+     */

+    private List<String> parametersSerialization = new ArrayList<String>();

+

+    /**

+     * the name of the udf

+     */

+    private String genericUDAFName;

+

+    /**

+     * aggregation mode

+     */

+    private GenericUDAFEvaluator.Mode mode;

+

+    /**

+     * list of type info

+     */

+    private List<TypeInfo> types = new ArrayList<TypeInfo>();

+

+    /**

+     * distinct or not

+     */

+    private boolean distinct;

+

+    /**

+     * the schema of incoming rows

+     */

+    private Schema rowSchema;

+

+    /**

+     * list of parameters

+     */

+    private transient List<ExprNodeDesc> parametersOrigin;

+

+    /**

+     * row inspector

+     */

+    private transient ObjectInspector rowInspector = null;

+

+    /**

+     * output object inspector

+     */

+    private transient ObjectInspector outputInspector = null;

+

+    /**

+     * output object inspector

+     */

+    private transient ObjectInspector outputInspectorPartial = null;

+

+    /**

+     * parameter inspectors

+     */

+    private transient ObjectInspector[] parameterInspectors = null;

+

+    /**

+     * expression desc

+     */

+    private transient HashMap<Long, List<ExprNodeDesc>> parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();

+

+    /**

+     * evaluators

+     */

+    private transient HashMap<Long, ExprNodeEvaluator[]> evaluators = new HashMap<Long, ExprNodeEvaluator[]>();

+

+    /**

+     * cached parameter objects

+     */

+    private transient HashMap<Long, Object[]> cachedParameters = new HashMap<Long, Object[]>();

+

+    /**

+     * cached row object: one per thread

+     */

+    private transient HashMap<Long, LazyObject<? extends ObjectInspector>> cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();

+

+    /**

+     * we only use lazy serde to do serialization

+     */

+    private transient HashMap<Long, SerDe> serDe = new HashMap<Long, SerDe>();

+

+    /**

+     * udaf evaluators

+     */

+    private transient HashMap<Long, GenericUDAFEvaluator> udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();

+

+    /**

+     * udaf evaluators

+     */

+    private transient HashMap<Long, GenericUDAFEvaluator> udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();

+

+    /**

+     * aggregation function desc

+     */

+    private transient AggregationDesc aggregator;

+

+    /**

+     * @param aggregator

+     *            Algebricks function call expression

+     * @param oi

+     *            schema

+     */

+    public AggregationFunctionFactory(AggregateFunctionCallExpression expression, Schema oi,

+            IVariableTypeEnvironment env) throws AlgebricksException {

+

+        try {

+            aggregator = (AggregationDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e.getMessage());

+        }

+        init(aggregator.getParameters(), aggregator.getGenericUDAFName(), aggregator.getMode(),

+                aggregator.getDistinct(), oi);

+    }

+

+    /**

+     * constructor of aggregation function factory

+     * 

+     * @param inputs

+     * @param name

+     * @param udafMode

+     * @param distinct

+     * @param oi

+     */

+    private void init(List<ExprNodeDesc> inputs, String name, GenericUDAFEvaluator.Mode udafMode, boolean distinct,

+            Schema oi) {

+        parametersOrigin = inputs;

+        genericUDAFName = name;

+        mode = udafMode;

+        this.distinct = distinct;

+        rowSchema = oi;

+

+        for (ExprNodeDesc input : inputs) {

+            TypeInfo type = input.getTypeInfo();

+            if (type instanceof StructTypeInfo) {

+                types.add(TypeInfoFactory.doubleTypeInfo);

+            } else

+                types.add(type);

+

+            String s = Utilities.serializeExpression(input);

+            parametersSerialization.add(s);

+        }

+    }

+

+    @Override

+    public synchronized ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)

+            throws AlgebricksException {

+        if (parametersOrigin == null) {

+            Configuration config = new Configuration();

+            config.setClassLoader(this.getClass().getClassLoader());

+            /**

+             * in case of class.forname(...) call in hive code

+             */

+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

+

+            parametersOrigin = new ArrayList<ExprNodeDesc>();

+            for (String serialization : parametersSerialization) {

+                parametersOrigin.add(Utilities.deserializeExpression(serialization, config));

+            }

+        }

+

+        /**

+         * exprs

+         */

+        if (parameterExprs == null)

+            parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();

+

+        /**

+         * evaluators

+         */

+        if (evaluators == null)

+            evaluators = new HashMap<Long, ExprNodeEvaluator[]>();

+

+        /**

+         * cached parameter objects

+         */

+        if (cachedParameters == null)

+            cachedParameters = new HashMap<Long, Object[]>();

+

+        /**

+         * cached row object: one per thread

+         */

+        if (cachedRowObjects == null)

+            cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();

+

+        /**

+         * we only use lazy serde to do serialization

+         */

+        if (serDe == null)

+            serDe = new HashMap<Long, SerDe>();

+

+        /**

+         * UDAF functions

+         */

+        if (udafsComplete == null)

+            udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();

+

+        /**

+         * UDAF functions

+         */

+        if (udafsPartial == null)

+            udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();

+

+        if (parameterInspectors == null)

+            parameterInspectors = new ObjectInspector[parametersOrigin.size()];

+

+        if (rowInspector == null)

+            rowInspector = rowSchema.toObjectInspector();

+

+        // get current thread id

+        long threadId = Thread.currentThread().getId();

+

+        /**

+         * expressions, expressions are thread local

+         */

+        List<ExprNodeDesc> parameters = parameterExprs.get(threadId);

+        if (parameters == null) {

+            parameters = new ArrayList<ExprNodeDesc>();

+            for (ExprNodeDesc parameter : parametersOrigin)

+                parameters.add(parameter.clone());

+            parameterExprs.put(threadId, parameters);

+        }

+

+        /**

+         * cached parameter objects

+         */

+        Object[] cachedParas = cachedParameters.get(threadId);

+        if (cachedParas == null) {

+            cachedParas = new Object[parameters.size()];

+            cachedParameters.put(threadId, cachedParas);

+        }

+

+        /**

+         * cached row object: one per thread

+         */

+        LazyObject<? extends ObjectInspector> cachedRowObject = cachedRowObjects.get(threadId);

+        if (cachedRowObject == null) {

+            cachedRowObject = LazyFactory.createLazyObject(rowInspector);

+            cachedRowObjects.put(threadId, cachedRowObject);

+        }

+

+        /**

+         * we only use lazy serde to do serialization

+         */

+        SerDe lazySer = serDe.get(threadId);

+        if (lazySer == null) {

+            lazySer = new LazySerDe();

+            serDe.put(threadId, lazySer);

+        }

+

+        /**

+         * evaluators

+         */

+        ExprNodeEvaluator[] evals = evaluators.get(threadId);

+        if (evals == null) {

+            evals = new ExprNodeEvaluator[parameters.size()];

+            evaluators.put(threadId, evals);

+        }

+

+        GenericUDAFEvaluator udafPartial;

+        GenericUDAFEvaluator udafComplete;

+

+        // initialize object inspectors

+        try {

+            /**

+             * evaluators, udf, object inpsectors are shared in one thread

+             */

+            for (int i = 0; i < evals.length; i++) {

+                if (evals[i] == null) {

+                    evals[i] = ExprNodeEvaluatorFactory.get(parameters.get(i));

+                    if (parameterInspectors[i] == null) {

+                        parameterInspectors[i] = evals[i].initialize(rowInspector);

+                    } else {

+                        evals[i].initialize(rowInspector);

+                    }

+                }

+            }

+

+            udafComplete = udafsComplete.get(threadId);

+            if (udafComplete == null) {

+                try {

+                    udafComplete = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);

+                } catch (HiveException e) {

+                    throw new AlgebricksException(e);

+                }

+                udafsComplete.put(threadId, udafComplete);

+                udafComplete.init(mode, parameterInspectors);

+            }

+

+            // multiple stage group by, determined by the mode parameter

+            if (outputInspector == null)

+                outputInspector = udafComplete.init(mode, parameterInspectors);

+

+            // initial partial gby udaf

+            GenericUDAFEvaluator.Mode partialMode;

+            // adjust mode for external groupby

+            if (mode == GenericUDAFEvaluator.Mode.COMPLETE)

+                partialMode = GenericUDAFEvaluator.Mode.PARTIAL1;

+            else if (mode == GenericUDAFEvaluator.Mode.FINAL)

+                partialMode = GenericUDAFEvaluator.Mode.PARTIAL2;

+            else

+                partialMode = mode;

+            udafPartial = udafsPartial.get(threadId);

+            if (udafPartial == null) {

+                try {

+                    udafPartial = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);

+                } catch (HiveException e) {

+                    throw new AlgebricksException(e);

+                }

+                udafPartial.init(partialMode, parameterInspectors);

+                udafsPartial.put(threadId, udafPartial);

+            }

+

+            // multiple stage group by, determined by the mode parameter

+            if (outputInspectorPartial == null)

+                outputInspectorPartial = udafPartial.init(partialMode, parameterInspectors);

+        } catch (Exception e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e);

+        }

+

+        return new AggregationFunctionEvaluator(parameters, types, genericUDAFName, mode, distinct, rowInspector,

+                provider.getDataOutput(), evals, parameterInspectors, cachedParas, lazySer, cachedRowObject,

+                udafPartial, udafComplete, outputInspector, outputInspectorPartial);

+    }

+

+    public String toString() {

+        return "aggregation function expression evaluator factory: " + this.genericUDAFName;

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java
new file mode 100644
index 0000000..54a1155
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java
@@ -0,0 +1,366 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.AggregatuibFunctionSerializableEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class AggregationFunctionSerializableFactory implements ICopySerializableAggregateFunctionFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * list of parameters' serialization
+     */
+    private List<String> parametersSerialization = new ArrayList<String>();
+
+    /**
+     * the name of the udf
+     */
+    private String genericUDAFName;
+
+    /**
+     * aggregation mode
+     */
+    private GenericUDAFEvaluator.Mode mode;
+
+    /**
+     * list of type info
+     */
+    private List<TypeInfo> types = new ArrayList<TypeInfo>();
+
+    /**
+     * distinct or not
+     */
+    private boolean distinct;
+
+    /**
+     * the schema of incoming rows
+     */
+    private Schema rowSchema;
+
+    /**
+     * list of parameters
+     */
+    private transient List<ExprNodeDesc> parametersOrigin;
+
+    /**
+     * row inspector
+     */
+    private transient ObjectInspector rowInspector = null;
+
+    /**
+     * output object inspector
+     */
+    private transient ObjectInspector outputInspector = null;
+
+    /**
+     * output object inspector
+     */
+    private transient ObjectInspector outputInspectorPartial = null;
+
+    /**
+     * parameter inspectors
+     */
+    private transient ObjectInspector[] parameterInspectors = null;
+
+    /**
+     * expression desc
+     */
+    private transient HashMap<Long, List<ExprNodeDesc>> parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+    /**
+     * evaluators
+     */
+    private transient HashMap<Long, ExprNodeEvaluator[]> evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+    /**
+     * cached parameter objects
+     */
+    private transient HashMap<Long, Object[]> cachedParameters = new HashMap<Long, Object[]>();
+
+    /**
+     * cached row object: one per thread
+     */
+    private transient HashMap<Long, LazyObject<? extends ObjectInspector>> cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+    /**
+     * we only use lazy serde to do serialization
+     */
+    private transient HashMap<Long, SerDe> serDe = new HashMap<Long, SerDe>();
+
+    /**
+     * udaf evaluators
+     */
+    private transient HashMap<Long, GenericUDAFEvaluator> udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+    /**
+     * udaf evaluators
+     */
+    private transient HashMap<Long, GenericUDAFEvaluator> udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+    /**
+     * aggregation function desc
+     */
+    private transient AggregationDesc aggregator;
+
+    /**
+     * @param aggregator
+     *            Algebricks function call expression
+     * @param oi
+     *            schema
+     */
+    public AggregationFunctionSerializableFactory(AggregateFunctionCallExpression expression, Schema oi,
+            IVariableTypeEnvironment env) throws AlgebricksException {
+
+        try {
+            aggregator = (AggregationDesc) ExpressionTranslator.getHiveExpression(expression, env);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException(e.getMessage());
+        }
+        init(aggregator.getParameters(), aggregator.getGenericUDAFName(), aggregator.getMode(),
+                aggregator.getDistinct(), oi);
+    }
+
+    /**
+     * constructor of aggregation function factory
+     * 
+     * @param inputs
+     * @param name
+     * @param udafMode
+     * @param distinct
+     * @param oi
+     */
+    private void init(List<ExprNodeDesc> inputs, String name, GenericUDAFEvaluator.Mode udafMode, boolean distinct,
+            Schema oi) {
+        parametersOrigin = inputs;
+        genericUDAFName = name;
+        mode = udafMode;
+        this.distinct = distinct;
+        rowSchema = oi;
+
+        for (ExprNodeDesc input : inputs) {
+            TypeInfo type = input.getTypeInfo();
+            if (type instanceof StructTypeInfo) {
+                types.add(TypeInfoFactory.doubleTypeInfo);
+            } else
+                types.add(type);
+
+            String s = Utilities.serializeExpression(input);
+            parametersSerialization.add(s);
+        }
+    }
+
+    @Override
+    public synchronized ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+        if (parametersOrigin == null) {
+            Configuration config = new Configuration();
+            config.setClassLoader(this.getClass().getClassLoader());
+            /**
+             * in case of class.forname(...) call in hive code
+             */
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+            parametersOrigin = new ArrayList<ExprNodeDesc>();
+            for (String serialization : parametersSerialization) {
+                parametersOrigin.add(Utilities.deserializeExpression(serialization, config));
+            }
+        }
+
+        /**
+         * exprs
+         */
+        if (parameterExprs == null)
+            parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+        /**
+         * evaluators
+         */
+        if (evaluators == null)
+            evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+        /**
+         * cached parameter objects
+         */
+        if (cachedParameters == null)
+            cachedParameters = new HashMap<Long, Object[]>();
+
+        /**
+         * cached row object: one per thread
+         */
+        if (cachedRowObjects == null)
+            cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+        /**
+         * we only use lazy serde to do serialization
+         */
+        if (serDe == null)
+            serDe = new HashMap<Long, SerDe>();
+
+        /**
+         * UDAF functions
+         */
+        if (udafsComplete == null)
+            udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+        /**
+         * UDAF functions
+         */
+        if (udafsPartial == null)
+            udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+        if (parameterInspectors == null)
+            parameterInspectors = new ObjectInspector[parametersOrigin.size()];
+
+        if (rowInspector == null)
+            rowInspector = rowSchema.toObjectInspector();
+
+        // get current thread id
+        long threadId = Thread.currentThread().getId();
+
+        /**
+         * expressions, expressions are thread local
+         */
+        List<ExprNodeDesc> parameters = parameterExprs.get(threadId);
+        if (parameters == null) {
+            parameters = new ArrayList<ExprNodeDesc>();
+            for (ExprNodeDesc parameter : parametersOrigin)
+                parameters.add(parameter.clone());
+            parameterExprs.put(threadId, parameters);
+        }
+
+        /**
+         * cached parameter objects
+         */
+        Object[] cachedParas = cachedParameters.get(threadId);
+        if (cachedParas == null) {
+            cachedParas = new Object[parameters.size()];
+            cachedParameters.put(threadId, cachedParas);
+        }
+
+        /**
+         * cached row object: one per thread
+         */
+        LazyObject<? extends ObjectInspector> cachedRowObject = cachedRowObjects.get(threadId);
+        if (cachedRowObject == null) {
+            cachedRowObject = LazyFactory.createLazyObject(rowInspector);
+            cachedRowObjects.put(threadId, cachedRowObject);
+        }
+
+        /**
+         * we only use lazy serde to do serialization
+         */
+        SerDe lazySer = serDe.get(threadId);
+        if (lazySer == null) {
+            lazySer = new LazySerDe();
+            serDe.put(threadId, lazySer);
+        }
+
+        /**
+         * evaluators
+         */
+        ExprNodeEvaluator[] evals = evaluators.get(threadId);
+        if (evals == null) {
+            evals = new ExprNodeEvaluator[parameters.size()];
+            evaluators.put(threadId, evals);
+        }
+
+        GenericUDAFEvaluator udafPartial;
+        GenericUDAFEvaluator udafComplete;
+
+        // initialize object inspectors
+        try {
+            /**
+             * evaluators, udf, object inpsectors are shared in one thread
+             */
+            for (int i = 0; i < evals.length; i++) {
+                if (evals[i] == null) {
+                    evals[i] = ExprNodeEvaluatorFactory.get(parameters.get(i));
+                    if (parameterInspectors[i] == null) {
+                        parameterInspectors[i] = evals[i].initialize(rowInspector);
+                    } else {
+                        evals[i].initialize(rowInspector);
+                    }
+                }
+            }
+
+            udafComplete = udafsComplete.get(threadId);
+            if (udafComplete == null) {
+                try {
+                    udafComplete = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+                } catch (HiveException e) {
+                    throw new AlgebricksException(e);
+                }
+                udafsComplete.put(threadId, udafComplete);
+                udafComplete.init(mode, parameterInspectors);
+            }
+
+            // multiple stage group by, determined by the mode parameter
+            if (outputInspector == null)
+                outputInspector = udafComplete.init(mode, parameterInspectors);
+
+            // initial partial gby udaf
+            GenericUDAFEvaluator.Mode partialMode;
+            // adjust mode for external groupby
+            if (mode == GenericUDAFEvaluator.Mode.COMPLETE)
+                partialMode = GenericUDAFEvaluator.Mode.PARTIAL1;
+            else if (mode == GenericUDAFEvaluator.Mode.FINAL)
+                partialMode = GenericUDAFEvaluator.Mode.PARTIAL2;
+            else
+                partialMode = mode;
+            udafPartial = udafsPartial.get(threadId);
+            if (udafPartial == null) {
+                try {
+                    udafPartial = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+                } catch (HiveException e) {
+                    throw new AlgebricksException(e);
+                }
+                udafPartial.init(partialMode, parameterInspectors);
+                udafsPartial.put(threadId, udafPartial);
+            }
+
+            // multiple stage group by, determined by the mode parameter
+            if (outputInspectorPartial == null)
+                outputInspectorPartial = udafPartial.init(partialMode, parameterInspectors);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException(e);
+        }
+
+        return new AggregatuibFunctionSerializableEvaluator(parameters, types, genericUDAFName, mode, distinct,
+                rowInspector, evals, parameterInspectors, cachedParas, lazySer, cachedRowObject, udafPartial,
+                udafComplete, outputInspector, outputInspectorPartial);
+    }
+
+    public String toString() {
+        return "aggregation function expression evaluator factory: " + this.genericUDAFName;
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..6f51bfe
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.ColumnExpressionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class ColumnExpressionEvaluatorFactory implements ICopyEvaluatorFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private ExprNodeColumnDesc expr;

+

+    private Schema inputSchema;

+

+    public ColumnExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)

+            throws AlgebricksException {

+        try {

+            expr = (ExprNodeColumnDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            throw new AlgebricksException(e.getMessage());

+        }

+        inputSchema = schema;

+    }

+

+    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {

+        return new ColumnExpressionEvaluator(expr, inputSchema.toObjectInspector(), output);

+    }

+

+    public String toString() {

+        return "column expression evaluator factory: " + expr.toString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..4ecdb70
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.ConstantExpressionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class ConstantExpressionEvaluatorFactory implements ICopyEvaluatorFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private ExprNodeConstantDesc expr;

+

+    private Schema schema;

+

+    public ConstantExpressionEvaluatorFactory(ILogicalExpression expression, Schema inputSchema,

+            IVariableTypeEnvironment env) throws AlgebricksException {

+        try {

+            expr = (ExprNodeConstantDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            throw new AlgebricksException(e.getMessage());

+        }

+        schema = inputSchema;

+    }

+

+    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {

+        return new ConstantExpressionEvaluator(expr, schema.toObjectInspector(), output);

+    }

+

+    public String toString() {

+        return "constant expression evaluator factory: " + expr.toString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..ef0c104
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.FieldExpressionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class FieldExpressionEvaluatorFactory implements ICopyEvaluatorFactory {

+    private static final long serialVersionUID = 1L;

+

+    private ExprNodeFieldDesc expr;

+

+    private Schema inputSchema;

+

+    public FieldExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)

+            throws AlgebricksException {

+        try {

+            expr = (ExprNodeFieldDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            throw new AlgebricksException(e.getMessage());

+        }

+        inputSchema = schema;

+    }

+

+    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {

+        return new FieldExpressionEvaluator(expr, inputSchema.toObjectInspector(), output);

+    }

+

+    public String toString() {

+        return "field access expression evaluator factory: " + expr.toString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java
new file mode 100644
index 0000000..c3b4b17
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java
@@ -0,0 +1,167 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import java.util.ArrayList;

+import java.util.Iterator;

+import java.util.List;

+

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.AggregateFunctionFactoryAdapter;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.UnnestingFunctionFactoryAdapter;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;

+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;

+

+public class HiveExpressionRuntimeProvider implements IExpressionRuntimeProvider {

+

+    public static final IExpressionRuntimeProvider INSTANCE = new HiveExpressionRuntimeProvider();

+

+    @Override

+    public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new AggregateFunctionFactoryAdapter(new AggregationFunctionFactory(expr, schema, env));

+    }

+

+    @Override

+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(

+            AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,

+            JobGenContext context) throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new AggregationFunctionSerializableFactory(expr, schema, env);

+    }

+

+    @Override

+    public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        return null;

+    }

+

+    @Override

+    public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new UnnestingFunctionFactoryAdapter(new UnnestingFunctionFactory(expr, schema, env));

+    }

+

+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,

+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {

+        switch (expr.getExpressionTag()) {

+            case VARIABLE: {

+                VariableReferenceExpression v = (VariableReferenceExpression) expr;

+                return new ScalarEvaluatorFactoryAdapter(createVariableEvaluatorFactory(v, env, inputSchemas, context));

+            }

+            case CONSTANT: {

+                ConstantExpression c = (ConstantExpression) expr;

+                return new ScalarEvaluatorFactoryAdapter(createConstantEvaluatorFactory(c, env, inputSchemas, context));

+            }

+            case FUNCTION_CALL: {

+                AbstractFunctionCallExpression fun = (AbstractFunctionCallExpression) expr;

+                FunctionIdentifier fid = fun.getFunctionIdentifier();

+

+                if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) {

+                    return new ScalarEvaluatorFactoryAdapter(createFieldExpressionEvaluatorFactory(fun, env,

+                            inputSchemas, context));

+                }

+

+                if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) {

+                    return new ScalarEvaluatorFactoryAdapter(createNullExpressionEvaluatorFactory(fun, env,

+                            inputSchemas, context));

+                }

+

+                if (fun.getKind() == FunctionKind.SCALAR) {

+                    ScalarFunctionCallExpression scalar = (ScalarFunctionCallExpression) fun;

+                    return new ScalarEvaluatorFactoryAdapter(createScalarFunctionEvaluatorFactory(scalar, env,

+                            inputSchemas, context));

+                } else {

+                    throw new AlgebricksException("Cannot create evaluator for function " + fun + " of kind "

+                            + fun.getKind());

+                }

+            }

+            default: {

+                throw new IllegalStateException();

+            }

+        }

+    }

+

+    private ICopyEvaluatorFactory createVariableEvaluatorFactory(VariableReferenceExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new ColumnExpressionEvaluatorFactory(expr, schema, env);

+    }

+

+    private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory(AbstractFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        List<String> names = new ArrayList<String>();

+        List<TypeInfo> types = new ArrayList<TypeInfo>();

+        for (IOperatorSchema inputSchema : inputSchemas) {

+            Schema schema = this.getSchema(inputSchema, env);

+            names.addAll(schema.getNames());

+            types.addAll(schema.getTypes());

+        }

+        Schema inputSchema = new Schema(names, types);

+        return new ScalarFunctionExpressionEvaluatorFactory(expr, inputSchema, env);

+    }

+

+    private ICopyEvaluatorFactory createFieldExpressionEvaluatorFactory(AbstractFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new FieldExpressionEvaluatorFactory(expr, schema, env);

+    }

+

+    private ICopyEvaluatorFactory createNullExpressionEvaluatorFactory(AbstractFunctionCallExpression expr,

+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)

+            throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new NullExpressionEvaluatorFactory(expr, schema, env);

+    }

+

+    private ICopyEvaluatorFactory createConstantEvaluatorFactory(ConstantExpression expr, IVariableTypeEnvironment env,

+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {

+        Schema schema = this.getSchema(inputSchemas[0], env);

+        return new ConstantExpressionEvaluatorFactory(expr, schema, env);

+    }

+

+    private Schema getSchema(IOperatorSchema inputSchema, IVariableTypeEnvironment env) throws AlgebricksException {

+        List<String> names = new ArrayList<String>();

+        List<TypeInfo> types = new ArrayList<TypeInfo>();

+        Iterator<LogicalVariable> variables = inputSchema.iterator();

+        while (variables.hasNext()) {

+            LogicalVariable var = variables.next();

+            names.add(var.toString());

+            types.add((TypeInfo) env.getVarType(var));

+        }

+

+        Schema schema = new Schema(names, types);

+        return schema;

+    }

+

+}
\ No newline at end of file
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..f37b825
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.NullExpressionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class NullExpressionEvaluatorFactory implements ICopyEvaluatorFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private ExprNodeNullDesc expr;

+

+    private Schema schema;

+

+    public NullExpressionEvaluatorFactory(ILogicalExpression expression, Schema intputSchema,

+            IVariableTypeEnvironment env) throws AlgebricksException {

+        try {

+            expr = (ExprNodeNullDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            throw new AlgebricksException(e.getMessage());

+        }

+        schema = intputSchema;

+    }

+

+    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {

+        return new NullExpressionEvaluator(expr, schema.toObjectInspector(), output);

+    }

+

+    public String toString() {

+        return "null expression evaluator factory: " + expr.toString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..cbac10a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.conf.Configuration;

+import org.apache.hadoop.hive.ql.exec.Utilities;

+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.FunctionExpressionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class ScalarFunctionExpressionEvaluatorFactory implements ICopyEvaluatorFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private transient ExprNodeGenericFuncDesc expr;

+

+    private String exprSerialization;

+

+    private Schema inputSchema;

+

+    private transient Configuration config;

+

+    public ScalarFunctionExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema,

+            IVariableTypeEnvironment env) throws AlgebricksException {

+        try {

+            expr = (ExprNodeGenericFuncDesc) ExpressionTranslator.getHiveExpression(expression, env);

+

+            exprSerialization = Utilities.serializeExpression(expr);

+

+        } catch (Exception e) {

+            e.printStackTrace();

+            throw new AlgebricksException(e.getMessage());

+        }

+        inputSchema = schema;

+    }

+

+    public synchronized ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {

+        if (expr == null) {

+            configClassLoader();

+            expr = (ExprNodeGenericFuncDesc) Utilities.deserializeExpression(exprSerialization, config);

+        }

+

+        ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr.clone();

+        return new FunctionExpressionEvaluator(funcDesc, inputSchema.toObjectInspector(), output);

+    }

+

+    private void configClassLoader() {

+        config = new Configuration();

+        ClassLoader loader = this.getClass().getClassLoader();

+        config.setClassLoader(loader);

+        Thread.currentThread().setContextClassLoader(loader);

+    }

+

+    public String toString() {

+        if (expr == null) {

+            configClassLoader();

+            expr = (ExprNodeGenericFuncDesc) Utilities.deserializeExpression(exprSerialization, new Configuration());

+        }

+

+        return "function expression evaluator factory: " + expr.getExprString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java
new file mode 100644
index 0000000..3b22513
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;

+

+import org.apache.hadoop.hive.ql.plan.UDTFDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;

+import edu.uci.ics.hivesterix.runtime.evaluator.UDTFFunctionEvaluator;

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;

+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;

+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;

+

+public class UnnestingFunctionFactory implements ICopyUnnestingFunctionFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private UDTFDesc expr;

+

+    private Schema inputSchema;

+

+    private int[] columns;

+

+    public UnnestingFunctionFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)

+            throws AlgebricksException {

+        try {

+            expr = (UDTFDesc) ExpressionTranslator.getHiveExpression(expression, env);

+        } catch (Exception e) {

+            throw new AlgebricksException(e.getMessage());

+        }

+        inputSchema = schema;

+    }

+

+    @Override

+    public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException {

+        return new UDTFFunctionEvaluator(expr, inputSchema, columns, provider.getDataOutput());

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..b636009
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveDoubleBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveDoubleBinaryHashFunctionFactory INSTANCE = new HiveDoubleBinaryHashFunctionFactory();

+

+    private HiveDoubleBinaryHashFunctionFactory() {

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction() {

+        // TODO Auto-generated method stub

+        return new IBinaryHashFunction() {

+            private Double value;

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                value = Double.longBitsToDouble(LazyUtils.byteArrayToLong(bytes, offset));

+                return value.hashCode();

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..90e6ce4
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveIntegerBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static IBinaryHashFunctionFactory INSTANCE = new HiveIntegerBinaryHashFunctionFactory();

+

+    private HiveIntegerBinaryHashFunctionFactory() {

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction() {

+

+        return new IBinaryHashFunction() {

+            private VInt value = new VInt();

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                LazyUtils.readVInt(bytes, offset, value);

+                if (value.length != length)

+                    throw new IllegalArgumentException("length mismatch in int hash function actual: " + length

+                            + " expected " + value.length);

+                return value.value;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..1b61f67
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveLongBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static IBinaryHashFunctionFactory INSTANCE = new HiveLongBinaryHashFunctionFactory();

+

+    private HiveLongBinaryHashFunctionFactory() {

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction() {

+

+        return new IBinaryHashFunction() {

+            private VLong value = new VLong();

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                LazyUtils.readVLong(bytes, offset, value);

+                return (int) value.value;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..f2b7b44
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveRawBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static IBinaryHashFunctionFactory INSTANCE = new HiveRawBinaryHashFunctionFactory();

+

+    private HiveRawBinaryHashFunctionFactory() {

+

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction() {

+

+        return new IBinaryHashFunction() {

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                int value = 1;

+                int end = offset + length;

+                for (int i = offset; i < end; i++)

+                    value = value * 31 + (int) bytes[i];

+                return value;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..a9cf6fd
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveStingBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {

+    private static final long serialVersionUID = 1L;

+

+    public static HiveStingBinaryHashFunctionFactory INSTANCE = new HiveStingBinaryHashFunctionFactory();

+

+    private HiveStingBinaryHashFunctionFactory() {

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction() {

+        // TODO Auto-generated method stub

+        return new IBinaryHashFunction() {

+            private VInt len = new VInt();

+

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                LazyUtils.readVInt(bytes, offset, len);

+                if (len.value + len.length != length)

+                    throw new IllegalStateException("parse string: length mismatch, expected "

+                            + (len.value + len.length) + " but get " + length);

+                return hashBytes(bytes, offset + len.length, length - len.length);

+            }

+

+            public int hashBytes(byte[] bytes, int offset, int length) {

+                int value = 1;

+                int end = offset + length;

+                for (int i = offset; i < end; i++)

+                    value = value * 31 + (int) bytes[i];

+                return value;

+            }

+        };

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..760a614
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+
+    public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
+
+    private static final long serialVersionUID = 1L;
+
+    private MurmurHash3BinaryHashFunctionFamily() {
+    }
+
+    private static final int C1 = 0xcc9e2d51;
+    private static final int C2 = 0x1b873593;
+    private static final int C3 = 5;
+    private static final int C4 = 0xe6546b64;
+    private static final int C5 = 0x85ebca6b;
+    private static final int C6 = 0xc2b2ae35;
+
+    @Override
+    public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+        return new IBinaryHashFunction() {
+            @Override
+            public int hash(byte[] bytes, int offset, int length) {
+                int h = seed;
+                int p = offset;
+                int remain = length;
+                while (remain >= 4) {
+                    int k = (bytes[p] & 0xff) | ((bytes[p + 1] & 0xff) << 8) | ((bytes[p + 2] & 0xff) << 16)
+                            | ((bytes[p + 3] & 0xff) << 24);
+                    k *= C1;
+                    k = Integer.rotateLeft(k, 15);
+                    k *= C2;
+                    h ^= k;
+                    h = Integer.rotateLeft(h, 13);
+                    h = h * C3 + C4;
+                    p += 4;
+                    remain -= 4;
+                }
+                if (remain > 0) {
+                    int k = 0;
+                    for (int i = 0; remain > 0; i += 8) {
+                        k ^= (bytes[p++] & 0xff) << i;
+                        remain--;
+                    }
+                    k *= C1;
+                    k = Integer.rotateLeft(k, 15);
+                    k *= C2;
+                    h ^= k;
+                }
+                h ^= length;
+                h ^= (h >>> 16);
+                h *= C5;
+                h ^= (h >>> 13);
+                h *= C6;
+                h ^= (h >>> 16);
+                return h;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..6ac012f
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveDoubleAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+        return new INormalizedKeyComputer() {
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int header = LazyUtils.byteArrayToInt(bytes, start);
+                long unsignedValue = (long) header;
+                return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..3044109
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveDoubleDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new HiveDoubleAscNormalizedKeyComputerFactory();
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int nk = nmkComputer.normalize(bytes, start, length);
+                return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+            }
+
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..a1d4d48
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveIntegerAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+        return new INormalizedKeyComputer() {
+            private VInt vint = new VInt();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                LazyUtils.readVInt(bytes, start, vint);
+                if (vint.length != length)
+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+                            + vint.length + " expected " + length);
+                long unsignedValue = (long) vint.value;
+                return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..b8a30a8
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveIntegerDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+        return new INormalizedKeyComputer() {
+            private VInt vint = new VInt();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                LazyUtils.readVInt(bytes, start, vint);
+                if (vint.length != length)
+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+                            + vint.length + " expected " + length);
+                long unsignedValue = (long) vint.value;
+                return (int) ((long) 0xffffffff - unsignedValue);
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..a893d19
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveLongAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+        return new INormalizedKeyComputer() {
+            private static final int POSTIVE_LONG_MASK = (3 << 30);
+            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+            private static final int NEGATIVE_LONG_MASK = (0 << 30);
+            private VLong vlong = new VLong();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                LazyUtils.readVLong(bytes, start, vlong);
+                if (vlong.length != length)
+                    throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+                            + vlong.length + " expected " + length);
+                long value = (long) vlong.value;
+                int highValue = (int) (value >> 32);
+                if (highValue > 0) {
+                    /**
+                     * larger than Integer.MAX
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= POSTIVE_LONG_MASK;
+                    return highNmk;
+                } else if (highValue == 0) {
+                    /**
+                     * smaller than Integer.MAX but >=0
+                     */
+                    int lowNmk = (int) value;
+                    lowNmk >>= 2;
+                    lowNmk |= NON_NEGATIVE_INT_MASK;
+                    return lowNmk;
+                } else {
+                    /**
+                     * less than 0; TODO: have not optimized for that
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= NEGATIVE_LONG_MASK;
+                    return highNmk;
+                }
+            }
+
+            private int getKey(int value) {
+                long unsignedFirstValue = (long) value;
+                int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+                return nmk;
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..cc5661b
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveLongDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new HiveIntegerAscNormalizedKeyComputerFactory();
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int nk = nmkComputer.normalize(bytes, start, length);
+                return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+            }
+
+        };
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d0429d6
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class HiveStringAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+        return new INormalizedKeyComputer() {
+            private VInt len = new VInt();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                LazyUtils.readVInt(bytes, start, len);
+
+                if (len.value + len.length != length)
+                    throw new IllegalStateException("parse string: length mismatch, expected "
+                            + (len.value + len.length) + " but get " + length);
+                int nk = 0;
+                int offset = start + len.length;
+                for (int i = 0; i < 2; ++i) {
+                    nk <<= 16;
+                    if (i < len.value) {
+                        char character = UTF8StringPointable.charAt(bytes, offset);
+                        nk += ((int) character) & 0xffff;
+                        offset += UTF8StringPointable.charSize(bytes, offset);
+                    }
+                }
+                return nk;
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..15b2d27
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class HiveStringDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private VInt len = new VInt();
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                LazyUtils.readVInt(bytes, start, len);
+                if (len.value + len.length != length)
+                    throw new IllegalStateException("parse string: length mismatch, expected "
+                            + (len.value + len.length) + " but get " + length);
+                int nk = 0;
+                int offset = start + len.length;
+                for (int i = 0; i < 2; ++i) {
+                    nk <<= 16;
+                    if (i < len.value) {
+                        nk += ((int) UTF8StringPointable.charAt(bytes, offset)) & 0xffff;
+                        offset += UTF8StringPointable.charSize(bytes, offset);
+                    }
+                }
+                return (int) ((long) 0xffffffff - (long) nk);
+            }
+        };
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java
new file mode 100644
index 0000000..590bd61
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hivesterix.runtime.factory.nullwriter;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class HiveNullWriterFactory implements INullWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static HiveNullWriterFactory INSTANCE = new HiveNullWriterFactory();
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new HiveNullWriter();
+    }
+}
+
+class HiveNullWriter implements INullWriter {
+
+    @Override
+    public void writeNull(DataOutput out) throws HyracksDataException {
+        // do nothing
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java
new file mode 100644
index 0000000..677e20e
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hivesterix.runtime.inspector;

+

+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;

+

+public class HiveBinaryBooleanInspector implements IBinaryBooleanInspector {

+

+    HiveBinaryBooleanInspector() {

+    }

+

+    @Override

+    public boolean getBooleanValue(byte[] bytes, int offset, int length) {

+        if (length == 0)

+            return false;

+        if (length != 1)

+            throw new IllegalStateException("boolean field error: with length " + length);

+        return bytes[0] == 1;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java
new file mode 100644
index 0000000..22a6065
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveBinaryBooleanInspectorFactory implements IBinaryBooleanInspectorFactory {
+    private static final long serialVersionUID = 1L;
+    public static HiveBinaryBooleanInspectorFactory INSTANCE = new HiveBinaryBooleanInspectorFactory();
+
+    private HiveBinaryBooleanInspectorFactory() {
+
+    }
+
+    @Override
+    public IBinaryBooleanInspector createBinaryBooleanInspector(IHyracksTaskContext arg0) {
+        return new HiveBinaryBooleanInspector();
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java
new file mode 100644
index 0000000..555afee
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hivesterix.runtime.inspector;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;

+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;

+

+public class HiveBinaryIntegerInspector implements IBinaryIntegerInspector {

+    private VInt value = new VInt();

+

+    HiveBinaryIntegerInspector() {

+    }

+

+    @Override

+    public int getIntegerValue(byte[] bytes, int offset, int length) {

+        LazyUtils.readVInt(bytes, offset, value);

+        if (value.length != length)

+            throw new IllegalArgumentException("length mismatch in int hash function actual: " + length + " expected "

+                    + value.length);

+        return value.value;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java
new file mode 100644
index 0000000..bb93a60
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveBinaryIntegerInspectorFactory implements IBinaryIntegerInspectorFactory {
+    private static final long serialVersionUID = 1L;
+    public static HiveBinaryIntegerInspectorFactory INSTANCE = new HiveBinaryIntegerInspectorFactory();
+
+    private HiveBinaryIntegerInspectorFactory() {
+
+    }
+
+    @Override
+    public IBinaryIntegerInspector createBinaryIntegerInspector(IHyracksTaskContext arg0) {
+        return new HiveBinaryIntegerInspector();
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..cfceb26
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class HiveConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+    public enum Policy {
+        PIPELINING,
+        SEND_SIDE_MAT_PIPELINING,
+        SEND_SIDE_MAT_BLOCKING,
+        SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING;
+    };
+
+    private static final long serialVersionUID = 1L;
+
+    private final IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+    private final IConnectorPolicy sendSideMatPipeliningPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private final IConnectorPolicy sendSideMatBlockingPolicy = new SendSideMaterializedBlockingConnectorPolicy();
+    private final IConnectorPolicy sendSideMatReceiveSideMatBlockingPolicy = new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+    private final Policy policy;
+
+    public HiveConnectorPolicyAssignmentPolicy(Policy policy) {
+        this.policy = policy;
+    }
+
+    @Override
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts) {
+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+            // avoid deadlocks
+            switch (policy) {
+                case PIPELINING:
+                case SEND_SIDE_MAT_PIPELINING:
+                    return sendSideMatPipeliningPolicy;
+                case SEND_SIDE_MAT_BLOCKING:
+                    return sendSideMatBlockingPolicy;
+                case SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING:
+                    return sendSideMatReceiveSideMatBlockingPolicy;
+                default:
+                    return sendSideMatPipeliningPolicy;
+            }
+        } else if (c instanceof MToNPartitioningConnectorDescriptor) {
+            // support different repartitioning policies
+            switch (policy) {
+                case PIPELINING:
+                    return pipeliningPolicy;
+                case SEND_SIDE_MAT_PIPELINING:
+                    return sendSideMatPipeliningPolicy;
+                case SEND_SIDE_MAT_BLOCKING:
+                    return sendSideMatBlockingPolicy;
+                case SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING:
+                    return sendSideMatReceiveSideMatBlockingPolicy;
+                default:
+                    return pipeliningPolicy;
+            }
+        } else {
+            // pipelining for other connectors
+            return pipeliningPolicy;
+        }
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java
new file mode 100644
index 0000000..ccc2e6c
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java
@@ -0,0 +1,32 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;

+

+public class HiveDataSink implements IDataSink {

+

+    private Object[] schema;

+

+    private Object fsOperator;

+

+    public HiveDataSink(Object sink, Object[] sourceSchema) {

+        schema = sourceSchema;

+        fsOperator = sink;

+    }

+

+    @Override

+    public Object getId() {

+        return fsOperator;

+    }

+

+    @Override

+    public Object[] getSchemaTypes() {

+        return schema;

+    }

+

+    public IPartitioningProperty getPartitioningProperty() {

+        return new RandomPartitioningProperty(new HiveDomain());

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java
new file mode 100644
index 0000000..67b743b
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.util.List;

+

+import org.apache.hadoop.hive.ql.plan.PartitionDesc;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;

+

+public class HiveDataSource<P> implements IDataSource<P> {

+

+    private P source;

+

+    private Object[] schema;

+

+    public HiveDataSource(P dataSource, Object[] sourceSchema) {

+        source = dataSource;

+        schema = sourceSchema;

+    }

+

+    @Override

+    public P getId() {

+        return source;

+    }

+

+    @Override

+    public Object[] getSchemaTypes() {

+        return schema;

+    }

+

+    @Override

+    public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {

+    }

+

+    @Override

+    public IDataSourcePropertiesProvider getPropertiesProvider() {

+        return new HiveDataSourcePartitioningProvider();

+    }

+

+    @Override

+    public String toString() {

+        PartitionDesc desc = (PartitionDesc) source;

+        return desc.getTableName();

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java
new file mode 100644
index 0000000..bb9c4ce
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.util.LinkedList;

+import java.util.List;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;

+

+public class HiveDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {

+

+    @Override

+    public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {

+        IPartitioningProperty property = new RandomPartitioningProperty(new HiveDomain());

+        IPhysicalPropertiesVector vector = new StructuralPropertiesVector(property,

+                new LinkedList<ILocalStructuralProperty>());

+        return vector;

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java
new file mode 100644
index 0000000..8b1d3b5
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;

+

+public class HiveDomain implements INodeDomain {

+

+    @Override

+    public boolean sameAs(INodeDomain domain) {

+        return true;

+    }

+

+    @Override

+    public Integer cardinality() {

+        return 0;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
new file mode 100644
index 0000000..daf6a7f
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -0,0 +1,137 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.util.HashMap;

+import java.util.List;

+

+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;

+import org.apache.hadoop.hive.ql.exec.Operator;

+import org.apache.hadoop.hive.ql.plan.PartitionDesc;

+

+import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo;

+import edu.uci.ics.hivesterix.runtime.jobgen.HiveScanRuntimeGenerator;

+import edu.uci.ics.hivesterix.runtime.jobgen.HiveWriteRuntimeGenerator;

+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;

+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;

+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;

+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+

+@SuppressWarnings("rawtypes")

+public class HiveMetaDataProvider<S, T> implements IMetadataProvider<S, T> {

+

+    private Operator fileSink;

+    private Schema outputSchema;

+    private HashMap<S, IDataSource<S>> dataSourceMap;

+

+    public HiveMetaDataProvider(Operator fsOp, Schema oi, HashMap<S, IDataSource<S>> map) {

+        fileSink = fsOp;

+        outputSchema = oi;

+        dataSourceMap = map;

+    }

+

+    @Override

+    public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

+    public IDataSource<S> findDataSource(S id) throws AlgebricksException {

+        return dataSourceMap.get(id);

+    }

+

+    @Override

+    public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {

+        return true;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,

+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,

+            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)

+            throws AlgebricksException {

+

+        S desc = dataSource.getId();

+        HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator((PartitionDesc) desc);

+        return generator.getRuntimeOperatorAndConstraint(dataSource, scanVariables, projectVariables, projectPushed,

+                context, jobSpec);

+    }

+

+    @Override

+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {

+

+        HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator((FileSinkOperator) fileSink, outputSchema);

+        return generator.getWriterRuntime(inputDesc);

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

+            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,

+            JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

+            JobSpecification spec) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,

+            IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,

+            JobSpecification arg5) throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {

+        return new HiveFunctionInfo(arg0, null);

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(

+            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

+            throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+    @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(

+            IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,

+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,

+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)

+            throws AlgebricksException {

+        // TODO Auto-generated method stub

+        return null;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java
new file mode 100644
index 0000000..cdb0e95
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.util.ArrayList;

+import java.util.HashMap;

+import java.util.Iterator;

+import java.util.List;

+import java.util.Map;

+

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;

+

+public class HiveOperatorSchema implements IOperatorSchema {

+

+    private final Map<LogicalVariable, Integer> varMap;

+

+    private final List<LogicalVariable> varList;

+

+    public HiveOperatorSchema() {

+        varMap = new HashMap<LogicalVariable, Integer>();

+        varList = new ArrayList<LogicalVariable>();

+    }

+

+    @Override

+    public void addAllVariables(IOperatorSchema source) {

+        for (LogicalVariable v : source) {

+            varMap.put(v, varList.size());

+            varList.add(v);

+        }

+    }

+

+    @Override

+    public void addAllNewVariables(IOperatorSchema source) {

+        for (LogicalVariable v : source) {

+            if (varMap.get(v) == null) {

+                varMap.put(v, varList.size());

+                varList.add(v);

+            }

+        }

+    }

+

+    @Override

+    public int addVariable(LogicalVariable var) {

+        int idx = varList.size();

+        varMap.put(var, idx);

+        varList.add(var);

+        return idx;

+    }

+

+    @Override

+    public void clear() {

+        varMap.clear();

+        varList.clear();

+    }

+

+    @Override

+    public int findVariable(LogicalVariable var) {

+        Integer i = varMap.get(var);

+        if (i == null) {

+            return -1;

+        }

+        return i;

+    }

+

+    @Override

+    public int getSize() {

+        return varList.size();

+    }

+

+    @Override

+    public LogicalVariable getVariable(int index) {

+        return varList.get(index);

+    }

+

+    @Override

+    public Iterator<LogicalVariable> iterator() {

+        return varList.iterator();

+    }

+

+    @Override

+    public String toString() {

+        return varMap.toString();

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
new file mode 100644
index 0000000..5e4e21e
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -0,0 +1,117 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.util.List;

+import java.util.Map;

+import java.util.Properties;

+

+import org.apache.hadoop.fs.Path;

+import org.apache.hadoop.hive.ql.plan.PartitionDesc;

+import org.apache.hadoop.mapred.InputSplit;

+import org.apache.hadoop.mapred.JobConf;

+

+import edu.uci.ics.hivesterix.common.config.ConfUtil;

+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveKeyValueParserFactory;

+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;

+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;

+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;

+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;

+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.api.topology.ClusterTopology;

+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

+

+@SuppressWarnings({ "rawtypes", "deprecation" })

+public class HiveScanRuntimeGenerator {

+

+    private PartitionDesc fileDesc;

+

+    private transient Path filePath;

+

+    private String filePathName;

+

+    private Properties properties;

+

+    public HiveScanRuntimeGenerator(PartitionDesc path) {

+        fileDesc = path;

+        properties = fileDesc.getProperties();

+

+        String inputPath = (String) properties.getProperty("location");

+

+        if (inputPath.startsWith("file:")) {

+            // Windows

+            String[] strs = inputPath.split(":");

+            filePathName = strs[strs.length - 1];

+        } else {

+            // Linux

+            filePathName = inputPath;

+        }

+

+        filePath = new Path(filePathName);

+    }

+

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRuntimeOperatorAndConstraint(

+            IDataSource dataSource, List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables,

+            boolean projectPushed, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {

+        try {

+            // get the correct delimiter from Hive metastore or other data

+            // structures

+            IOperatorSchema propagatedSchema = new HiveOperatorSchema();

+

+            List<LogicalVariable> outputVariables = projectPushed ? projectVariables : scanVariables;

+            for (LogicalVariable var : outputVariables)

+                propagatedSchema.addVariable(var);

+

+            int[] outputColumnsOffset = new int[scanVariables.size()];

+            int i = 0;

+            for (LogicalVariable var : scanVariables)

+                if (outputVariables.contains(var)) {

+                    int offset = outputVariables.indexOf(var);

+                    outputColumnsOffset[i++] = offset;

+                } else

+                    outputColumnsOffset[i++] = -1;

+

+            Object[] schemaTypes = dataSource.getSchemaTypes();

+            // get record descriptor

+            RecordDescriptor recDescriptor = mkRecordDescriptor(propagatedSchema, schemaTypes, context);

+

+            // setup the run time operator and constraints

+            JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

+            String[] locConstraints = ConfUtil.getNCs();

+            Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

+            ClusterTopology topology = ConfUtil.getClusterTopology();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);

+            InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

+            String[] schedule = scheduler.getLocationConstraints(splits);

+            IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,

+                    schedule, new HiveKeyValueParserFactory(fileDesc, conf, outputColumnsOffset));

+

+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner,

+                    new AlgebricksAbsolutePartitionConstraint(locConstraints));

+        } catch (Exception e) {

+            throw new AlgebricksException(e);

+        }

+    }

+

+    private static RecordDescriptor mkRecordDescriptor(IOperatorSchema opSchema, Object[] types, JobGenContext context)

+            throws AlgebricksException {

+        ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];

+        ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();

+        int size = opSchema.getSize();

+        for (int i = 0; i < size; i++) {

+            Object t = types[i];

+            fields[i] = sdp.getSerializerDeserializer(t);

+            i++;

+        }

+        return new RecordDescriptor(fields);

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java
new file mode 100644
index 0000000..7a577e8
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;

+import org.apache.hadoop.mapred.JobConf;

+

+import edu.uci.ics.hivesterix.common.config.ConfUtil;

+import edu.uci.ics.hivesterix.runtime.operator.filewrite.HivePushRuntimeFactory;

+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;

+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+

+@SuppressWarnings("deprecation")

+public class HiveWriteRuntimeGenerator {

+    private FileSinkOperator fileSink;

+

+    private Schema inputSchema;

+

+    public HiveWriteRuntimeGenerator(FileSinkOperator fsOp, Schema oi) {

+        fileSink = fsOp;

+        inputSchema = oi;

+    }

+

+    /**

+     * get the write runtime

+     * 

+     * @param inputDesc

+     * @return

+     */

+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriterRuntime(RecordDescriptor inputDesc) {

+        JobConf conf = ConfUtil.getJobConf();

+        IPushRuntimeFactory factory = new HivePushRuntimeFactory(inputDesc, conf, fileSink, inputSchema);

+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> pair = new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(

+                factory, null);

+        return pair;

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java
new file mode 100644
index 0000000..927c709
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;

+

+import java.io.Serializable;

+import java.util.List;

+

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

+

+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;

+

+public class Schema implements Serializable {

+

+    private static final long serialVersionUID = 1L;

+

+    private List<String> fieldNames;

+

+    private List<TypeInfo> fieldTypes;

+

+    public Schema(List<String> fieldNames, List<TypeInfo> fieldTypes) {

+        this.fieldNames = fieldNames;

+        this.fieldTypes = fieldTypes;

+    }

+

+    public ObjectInspector toObjectInspector() {

+        return LazyUtils.getLazyObjectInspector(fieldNames, fieldTypes);

+    }

+

+    public List<String> getNames() {

+        return fieldNames;

+    }

+

+    public List<TypeInfo> getTypes() {

+        return fieldTypes;

+    }

+

+    public Object[] getSchema() {

+        return fieldTypes.toArray();

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
new file mode 100644
index 0000000..472994a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -0,0 +1,209 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
+import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParser<K, V> implements IKeyValueParser<K, V> {
+    /**
+     * the columns to output: projection is pushed into this scan
+     */
+    private int[] outputColumnsOffset;
+
+    /**
+     * serialization/de-serialization object
+     */
+    private SerDe serDe;
+
+    /**
+     * the input row object inspector
+     */
+    private StructObjectInspector structInspector;
+
+    /**
+     * the hadoop job conf
+     */
+    private JobConf job;
+
+    /**
+     * Hyrax context to control resource allocation
+     */
+    private final IHyracksTaskContext ctx;
+
+    /**
+     * lazy serde: format flow in between operators
+     */
+    private final SerDe outputSerDe;
+
+    /**
+     * the parser from hive data to binary data
+     */
+    private IHiveParser parser;
+
+    /**
+     * the buffer for buffering output data
+     */
+    private ByteBuffer buffer;
+
+    /**
+     * the frame tuple appender
+     */
+    private FrameTupleAppender appender;
+
+    /**
+     * the array tuple builder
+     */
+    private ArrayTupleBuilder tb;
+
+    /**
+     * the field references of all fields
+     */
+    private List<? extends StructField> fieldRefs;
+
+    /**
+     * output fields
+     */
+    private Object[] outputFields;
+
+    /**
+     * output field references
+     */
+    private StructField[] outputFieldRefs;
+
+    public HiveKeyValueParser(String serDeClass, String outputSerDeClass, Properties tbl, JobConf conf,
+            final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws HyracksDataException {
+        try {
+            job = conf;
+            // initialize the input serde
+            serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);
+            serDe.initialize(job, tbl);
+            // initialize the output serde
+            outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);
+            outputSerDe.initialize(job, tbl);
+            // object inspector of the row
+            structInspector = (StructObjectInspector) serDe.getObjectInspector();
+            // hyracks context
+            this.ctx = ctx;
+            this.outputColumnsOffset = outputColumnsOffset;
+
+            if (structInspector instanceof LazySimpleStructObjectInspector) {
+                LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) structInspector;
+                List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();
+                boolean lightWeightParsable = true;
+                for (StructField fieldRef : fieldRefs) {
+                    Category category = fieldRef.getFieldObjectInspector().getCategory();
+                    if (!(category == Category.PRIMITIVE)) {
+                        lightWeightParsable = false;
+                        break;
+                    }
+                }
+                if (lightWeightParsable) {
+                    parser = new TextToBinaryTupleParser(this.outputColumnsOffset, structInspector);
+                }
+            }
+
+            fieldRefs = structInspector.getAllStructFieldRefs();
+            int size = 0;
+            for (int i = 0; i < outputColumnsOffset.length; i++) {
+                if (outputColumnsOffset[i] >= 0) {
+                    size++;
+                }
+            }
+
+            tb = new ArrayTupleBuilder(size);
+            outputFieldRefs = new StructField[size];
+            outputFields = new Object[size];
+            for (int i = 0; i < outputColumnsOffset.length; i++)
+                if (outputColumnsOffset[i] >= 0)
+                    outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void open(IFrameWriter writer) throws HyracksDataException {
+        buffer = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(buffer, true);
+    }
+
+    @Override
+    public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+        try {
+            tb.reset();
+            if (parser != null) {
+                Text text = (Text) value;
+                parser.parse(text.getBytes(), 0, text.getLength(), tb);
+            } else {
+                Object row = serDe.deserialize((Writable) value);
+                /**
+                 * write fields to the tuple builder one by one
+                 */
+                int i = 0;
+                for (StructField fieldRef : fieldRefs) {
+                    if (outputColumnsOffset[i] >= 0)
+                        outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);
+                    i++;
+                }
+                i = 0;
+                DataOutput dos = tb.getDataOutput();
+                for (Object field : outputFields) {
+                    BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,
+                            outputFieldRefs[i].getFieldObjectInspector());
+                    dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());
+                    tb.addFieldEndOffset();
+                    i++;
+                }
+            }
+
+            /**
+             * append the tuple and flush it if necessary.
+             */
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                FrameUtils.flushFrame(buffer, writer);
+                appender.reset(buffer, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close(IFrameWriter writer) throws HyracksDataException {
+        /**
+         * flush the residual tuples
+         */
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(buffer, writer);
+        }
+        System.gc();
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
new file mode 100644
index 0000000..05903b9
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParserFactory<K, V> implements IKeyValueParserFactory<K, V> {
+    private static final long serialVersionUID = 1L;
+    private final String serDeClass;
+    private final String outputSerDeClass = LazySerDe.class.getName();;
+    private final Properties tbl;
+    private final ConfFactory confFactory;
+    private final int[] outputColumnsOffset;
+
+    public HiveKeyValueParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumnsOffset)
+            throws HyracksDataException {
+        this.tbl = desc.getProperties();
+        this.serDeClass = (String) tbl.getProperty("serialization.lib");
+        this.outputColumnsOffset = outputColumnsOffset;
+        this.confFactory = new ConfFactory(conf);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new HiveKeyValueParser(serDeClass, outputSerDeClass, tbl, confFactory.getConf(), ctx,
+                outputColumnsOffset);
+    }
+
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
new file mode 100644
index 0000000..81faf38
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
@@ -0,0 +1,153 @@
+package edu.uci.ics.hivesterix.runtime.operator.filewrite;

+

+import java.nio.ByteBuffer;

+

+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;

+import org.apache.hadoop.hive.ql.exec.OperatorFactory;

+import org.apache.hadoop.hive.ql.exec.RowSchema;

+import org.apache.hadoop.hive.ql.metadata.HiveException;

+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;

+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

+import org.apache.hadoop.mapred.JobConf;

+

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hivesterix.serde.lazy.LazyColumnar;

+import edu.uci.ics.hivesterix.serde.lazy.objectinspector.LazyColumnarObjectInspector;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;

+

+@SuppressWarnings("deprecation")

+public class HiveFileWritePushRuntime implements IPushRuntime {

+

+	/**

+	 * frame tuple accessor to access byte buffer

+	 */

+	private final FrameTupleAccessor accessor;

+

+	/**

+	 * input object inspector

+	 */

+	private final ObjectInspector inputInspector;

+

+	/**

+	 * cachedInput

+	 */

+	private final LazyColumnar cachedInput;

+

+	/**

+	 * File sink operator of Hive

+	 */

+	private final FileSinkDesc fileSink;

+

+	/**

+	 * job configuration, which contain name node and other configuration

+	 * information

+	 */

+	private JobConf conf;

+

+	/**

+	 * input object inspector

+	 */

+	private final Schema inputSchema;

+

+	/**

+	 * a copy of hive schema representation

+	 */

+	private RowSchema rowSchema;

+

+	/**

+	 * the Hive file sink operator

+	 */

+	private FileSinkOperator fsOp;

+

+	/**

+	 * cached tuple object reference

+	 */

+	private FrameTupleReference tuple = new FrameTupleReference();

+

+	/**

+	 * @param spec

+	 * @param fsProvider

+	 */

+	public HiveFileWritePushRuntime(IHyracksTaskContext context,

+			RecordDescriptor inputRecordDesc, JobConf job, FileSinkDesc fs,

+			RowSchema schema, Schema oi) {

+		fileSink = fs;

+		fileSink.setGatherStats(false);

+

+		rowSchema = schema;

+		conf = job;

+		inputSchema = oi;

+

+		accessor = new FrameTupleAccessor(context.getFrameSize(),

+				inputRecordDesc);

+		inputInspector = inputSchema.toObjectInspector();

+		cachedInput = new LazyColumnar(

+				(LazyColumnarObjectInspector) inputInspector);

+	}

+

+	@Override

+	public void open() throws HyracksDataException {

+		fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);

+		fsOp.setChildOperators(null);

+		fsOp.setParentOperators(null);

+		conf.setClassLoader(this.getClass().getClassLoader());

+

+		ObjectInspector[] inspectors = new ObjectInspector[1];

+		inspectors[0] = inputInspector;

+		try {

+			fsOp.initialize(conf, inspectors);

+			fsOp.setExecContext(null);

+		} catch (Exception e) {

+			e.printStackTrace();

+		}

+	}

+

+	@Override

+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

+		accessor.reset(buffer);

+		int n = accessor.getTupleCount();

+		try {

+			for (int i = 0; i < n; ++i) {

+				tuple.reset(accessor, i);

+				cachedInput.init(tuple);

+				fsOp.process(cachedInput, 0);

+			}

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

+

+	@Override

+	public void close() throws HyracksDataException {

+		try {

+			Thread.currentThread().setContextClassLoader(

+					this.getClass().getClassLoader());

+			fsOp.closeOp(false);

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

+

+	@Override

+	public void setFrameWriter(int index, IFrameWriter writer,

+			RecordDescriptor recordDesc) {

+		throw new IllegalStateException();

+	}

+

+	@Override

+	public void setInputRecordDescriptor(int index,

+			RecordDescriptor recordDescriptor) {

+	}

+

+	@Override

+	public void fail() throws HyracksDataException {

+

+	}

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java
new file mode 100644
index 0000000..6c18231
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hivesterix.runtime.operator.filewrite;

+

+import java.io.DataInputStream;

+import java.io.DataOutputStream;

+import java.io.File;

+import java.io.FileInputStream;

+import java.io.FileOutputStream;

+import java.io.OutputStreamWriter;

+import java.io.PrintWriter;

+import java.util.UUID;

+

+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;

+import org.apache.hadoop.hive.ql.exec.RowSchema;

+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;

+import org.apache.hadoop.mapred.JobConf;

+

+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;

+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+

+@SuppressWarnings("deprecation")

+public class HivePushRuntimeFactory implements IPushRuntimeFactory {

+

+    private static final long serialVersionUID = 1L;

+

+    private final RecordDescriptor inputRecordDesc;

+    private transient JobConf conf;

+    private final FileSinkDesc fileSink;

+    private final RowSchema outSchema;

+    private final Schema schema;

+

+    /**

+     * the content of the configuration

+     */

+    private String confContent;

+

+    public HivePushRuntimeFactory(RecordDescriptor inputRecordDesc, JobConf conf, FileSinkOperator fsp, Schema sch) {

+        this.inputRecordDesc = inputRecordDesc;

+        this.conf = conf;

+        this.fileSink = fsp.getConf();

+        outSchema = fsp.getSchema();

+        this.schema = sch;

+

+        writeConfContent();

+    }

+

+    @Override

+    public String toString() {

+        return "file write";

+    }

+

+    @Override

+    public IPushRuntime createPushRuntime(IHyracksTaskContext context) throws AlgebricksException {

+        if (conf == null)

+            readConfContent();

+

+        return new HiveFileWritePushRuntime(context, inputRecordDesc, conf, fileSink, outSchema, schema);

+    }

+

+    private void readConfContent() {

+        File dir = new File("hadoop-conf-tmp");

+        if (!dir.exists()) {

+            dir.mkdir();

+        }

+

+        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

+        try {

+            PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));

+            out.write(confContent);

+            out.close();

+            conf = new JobConf(fileName);

+        } catch (Exception e) {

+            e.printStackTrace();

+        }

+    }

+

+    private void writeConfContent() {

+        File dir = new File("hadoop-conf-tmp");

+        if (!dir.exists()) {

+            dir.mkdir();

+        }

+

+        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

+        try {

+            DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));

+            conf.writeXml(out);

+            out.close();

+

+            DataInputStream in = new DataInputStream(new FileInputStream(fileName));

+            StringBuffer buffer = new StringBuffer();

+            String line;

+            while ((line = in.readLine()) != null) {

+                buffer.append(line + "\n");

+            }

+            in.close();

+            confContent = buffer.toString();

+        } catch (Exception e) {

+            e.printStackTrace();

+        }

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java
new file mode 100644
index 0000000..467ec0a
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.hivesterix.runtime.provider;

+

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

+

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveByteBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveByteBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveDoubleBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveDoubleBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveFloatBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveFloatBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveIntegerBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveIntegerBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveLongBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveLongBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveShortBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveShortBinaryDescComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveStringBinaryAscComparatorFactory;

+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveStringBinaryDescComparatorFactory;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;

+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+

+public class HiveBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider {

+

+    public static final HiveBinaryComparatorFactoryProvider INSTANCE = new HiveBinaryComparatorFactoryProvider();

+

+    private HiveBinaryComparatorFactoryProvider() {

+    }

+

+    @Override

+    public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending)

+            throws AlgebricksException {

+        if (type.equals(TypeInfoFactory.intTypeInfo)) {

+            if (ascending)

+                return HiveIntegerBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveIntegerBinaryDescComparatorFactory.INSTANCE;

+

+        } else if (type.equals(TypeInfoFactory.longTypeInfo)) {

+            if (ascending)

+                return HiveLongBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveLongBinaryDescComparatorFactory.INSTANCE;

+

+        } else if (type.equals(TypeInfoFactory.floatTypeInfo)) {

+            if (ascending)

+                return HiveFloatBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveFloatBinaryDescComparatorFactory.INSTANCE;

+

+        } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {

+            if (ascending)

+                return HiveDoubleBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveDoubleBinaryDescComparatorFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.shortTypeInfo)) {

+            if (ascending)

+                return HiveShortBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveShortBinaryDescComparatorFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.stringTypeInfo)) {

+            if (ascending)

+                return HiveStringBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveStringBinaryDescComparatorFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.byteTypeInfo) || type.equals(TypeInfoFactory.booleanTypeInfo)) {

+            if (ascending)

+                return HiveByteBinaryAscComparatorFactory.INSTANCE;

+            else

+                return HiveByteBinaryDescComparatorFactory.INSTANCE;

+        } else

+            throw new NotImplementedException();

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java
new file mode 100644
index 0000000..473eee1
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.provider;

+

+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

+

+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveDoubleBinaryHashFunctionFactory;

+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveIntegerBinaryHashFunctionFactory;

+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveLongBinaryHashFunctionFactory;

+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveRawBinaryHashFunctionFactory;

+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveStingBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+

+public class HiveBinaryHashFunctionFactoryProvider implements IBinaryHashFunctionFactoryProvider {

+

+    public static final HiveBinaryHashFunctionFactoryProvider INSTANCE = new HiveBinaryHashFunctionFactoryProvider();

+

+    private HiveBinaryHashFunctionFactoryProvider() {

+    }

+

+    @Override

+    public IBinaryHashFunctionFactory getBinaryHashFunctionFactory(Object type) throws AlgebricksException {

+        if (type.equals(TypeInfoFactory.intTypeInfo)) {

+            return HiveIntegerBinaryHashFunctionFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.longTypeInfo)) {

+            return HiveLongBinaryHashFunctionFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.stringTypeInfo)) {

+            return HiveStingBinaryHashFunctionFactory.INSTANCE;

+        } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {

+            return HiveDoubleBinaryHashFunctionFactory.INSTANCE;

+        } else {

+            return HiveRawBinaryHashFunctionFactory.INSTANCE;

+        }

+    }

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..e7a2e79
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class HiveBinaryHashFunctionFamilyProvider implements IBinaryHashFunctionFamilyProvider {
+
+    public static HiveBinaryHashFunctionFamilyProvider INSTANCE = new HiveBinaryHashFunctionFamilyProvider();
+
+    private HiveBinaryHashFunctionFamilyProvider() {
+
+    }
+
+    @Override
+    public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type) throws AlgebricksException {
+        return MurmurHash3BinaryHashFunctionFamily.INSTANCE;
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..91bf3e5
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveDoubleAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveDoubleDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveIntegerAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveIntegerDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveLongAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveLongDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveStringAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveStringDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveNormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
+
+    public static final HiveNormalizedKeyComputerFactoryProvider INSTANCE = new HiveNormalizedKeyComputerFactoryProvider();
+
+    private HiveNormalizedKeyComputerFactoryProvider() {
+    }
+
+    @Override
+    public INormalizedKeyComputerFactory getNormalizedKeyComputerFactory(Object type, boolean ascending) {
+        if (ascending) {
+            if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+                return new HiveStringAscNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.intTypeInfo)) {
+                return new HiveIntegerAscNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+                return new HiveLongAscNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+                return new HiveDoubleAscNormalizedKeyComputerFactory();
+            } else {
+                return null;
+            }
+        } else {
+            if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+                return new HiveStringDescNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.intTypeInfo)) {
+                return new HiveIntegerDescNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+                return new HiveLongDescNormalizedKeyComputerFactory();
+            } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+                return new HiveDoubleDescNormalizedKeyComputerFactory();
+            } else {
+                return null;
+            }
+        }
+    }
+}
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java
new file mode 100644
index 0000000..10c84d2
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.provider;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;

+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;

+

+public class HivePrinterFactoryProvider implements IPrinterFactoryProvider {

+

+    public static IPrinterFactoryProvider INSTANCE = new HivePrinterFactoryProvider();

+

+    @Override

+    public IPrinterFactory getPrinterFactory(Object type) throws AlgebricksException {

+        return null;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java
new file mode 100644
index 0000000..22f81e0
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.hivesterix.runtime.provider;

+

+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+

+public class HiveSerializerDeserializerProvider implements ISerializerDeserializerProvider {

+

+    public static final HiveSerializerDeserializerProvider INSTANCE = new HiveSerializerDeserializerProvider();

+

+    private HiveSerializerDeserializerProvider() {

+    }

+

+    @SuppressWarnings("rawtypes")

+    @Override

+    public ISerializerDeserializer getSerializerDeserializer(Object type) throws AlgebricksException {

+        // return ARecordSerializerDeserializer.SCHEMALESS_INSTANCE;

+        return null;

+    }

+

+}

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java
new file mode 100644
index 0000000..be4b149
--- /dev/null
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+
+public class HiveTypeTraitProvider implements ITypeTraitProvider, Serializable {
+    private static final long serialVersionUID = 1L;
+    public static HiveTypeTraitProvider INSTANCE = new HiveTypeTraitProvider();
+
+    private HiveTypeTraitProvider() {
+
+    }
+
+    @Override
+    public ITypeTraits getTypeTrait(Object arg0) {
+        return new ITypeTraits() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public int getFixedLength() {
+                return -1;
+            }
+
+            @Override
+            public boolean isFixedLength() {
+                return false;
+            }
+
+        };
+    }
+}