split hivesterix into serveral modules
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3074 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-runtime/pom.xml b/hivesterix/hivesterix-runtime/pom.xml
new file mode 100644
index 0000000..77db0d4
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/pom.xml
@@ -0,0 +1,383 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hivesterix-runtime</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <name>hivesterix-runtime</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.12</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20090211</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.0.0.M1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ <dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>0.9.94</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.datanucleus</groupId>
+ <artifactId>datanucleus-core</artifactId>
+ <version>2.0.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.datanucleus</groupId>
+ <artifactId>datanucleus-connectionpool</artifactId>
+ <version>2.0.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.datanucleus</groupId>
+ <artifactId>datanucleus-enhancer</artifactId>
+ <version>2.0.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.datanucleus</groupId>
+ <artifactId>datanucleus-rdbms</artifactId>
+ <version>2.0.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ <version>1.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>1.5.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax</groupId>
+ <artifactId>jdo2-api</artifactId>
+ <version>2.3-ec</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook</groupId>
+ <artifactId>libfb303</artifactId>
+ <version>0.5.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.5.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>cli</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ <version>3.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-hwi</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-shims</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.7.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.1</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ <type>jar</type>
+ <classifier>api</classifier>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>r06</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>stringtemplate</artifactId>
+ <version>3.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.8.1.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.90.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hivesterix-serde</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hivesterix-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <encoding>UTF-8</encoding>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>patch</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>patch</classifier>
+ <finalName>a-hive-rumtime</finalName>
+ <includes>
+ <include>**/org/apache/**</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <repositories>
+ <repository>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>warn</checksumPolicy>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <id>third-party</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/third-party</url>
+ </repository>
+ <repository>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>warn</checksumPolicy>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <id>hyracks-public-release</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/repositories/hyracks-public-releases</url>
+ </repository>
+ </repositories>
+</project>
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java
new file mode 100644
index 0000000..ad02239
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AbstractExpressionEvaluator.java
@@ -0,0 +1,169 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractExpressionEvaluator implements ICopyEvaluator {
+
+ private List<ICopyEvaluator> children;
+
+ private ExprNodeEvaluator evaluator;
+
+ private IDataOutputProvider out;
+
+ private ObjectInspector inspector;
+
+ /**
+ * output object inspector
+ */
+ private ObjectInspector outputInspector;
+
+ /**
+ * cached row object
+ */
+ private LazyObject<? extends ObjectInspector> cachedRowObject;
+
+ /**
+ * serializer/derialzer for lazy object
+ */
+ private SerDe lazySer;
+
+ /**
+ * data output
+ */
+ DataOutput dataOutput;
+
+ public AbstractExpressionEvaluator(ExprNodeEvaluator hiveEvaluator, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ evaluator = hiveEvaluator;
+ out = output;
+ inspector = oi;
+ dataOutput = out.getDataOutput();
+ }
+
+ protected ObjectInspector getRowInspector() {
+ return null;
+ }
+
+ protected IDataOutputProvider getIDataOutputProvider() {
+ return out;
+ }
+
+ protected ExprNodeEvaluator getHiveEvaluator() {
+ return evaluator;
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return inspector;
+ }
+
+ @Override
+ public void evaluate(IFrameTupleReference r) throws AlgebricksException {
+ // initialize hive evaluator
+ try {
+ if (outputInspector == null)
+ outputInspector = evaluator.initialize(inspector);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ }
+
+ readIntoCache(r);
+ try {
+ Object result = evaluator.evaluate(cachedRowObject);
+
+ // if (result == null) {
+ // result = evaluator.evaluate(cachedRowObject);
+ //
+ // // check if result is null
+ //
+ // String errorMsg = "serialize null object in \n output " +
+ // outputInspector.toString() + " \n input "
+ // + inspector.toString() + "\n ";
+ // errorMsg += "";
+ // List<Object> columns = ((StructObjectInspector)
+ // inspector).getStructFieldsDataAsList(cachedRowObject);
+ // for (Object column : columns) {
+ // errorMsg += column.toString() + " ";
+ // }
+ // errorMsg += "\n";
+ // Log.info(errorMsg);
+ // System.out.println(errorMsg);
+ // // result = new BooleanWritable(true);
+ // throw new IllegalStateException(errorMsg);
+ // }
+
+ serializeResult(result);
+ } catch (HiveException e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ }
+ }
+
+ /**
+ * serialize the result
+ *
+ * @param result
+ * the evaluation result
+ * @throws IOException
+ * @throws AlgebricksException
+ */
+ private void serializeResult(Object result) throws IOException, AlgebricksException {
+ if (lazySer == null)
+ lazySer = new LazySerDe();
+
+ try {
+ BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, outputInspector);
+ dataOutput.write(outputWritable.getBytes(), 0, outputWritable.getLength());
+ } catch (SerDeException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ /**
+ * bind the tuple reference to the cached row object
+ *
+ * @param r
+ */
+ private void readIntoCache(IFrameTupleReference r) {
+ if (cachedRowObject == null)
+ cachedRowObject = (LazyObject<? extends ObjectInspector>) LazyFactory.createLazyObject(inspector);
+ cachedRowObject.init(r);
+ }
+
+ /**
+ * set a list of children of this evaluator
+ *
+ * @param children
+ */
+ public void setChildren(List<ICopyEvaluator> children) {
+ this.children = children;
+ }
+
+ public void addChild(ICopyEvaluator child) {
+ if (children == null)
+ children = new ArrayList<ICopyEvaluator>();
+ children.add(child);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java
new file mode 100644
index 0000000..e500376
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregationFunctionEvaluator.java
@@ -0,0 +1,224 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AggregationFunctionEvaluator implements ICopyAggregateFunction {
+
+ /**
+ * the mode of aggregation function
+ */
+ private GenericUDAFEvaluator.Mode mode;
+
+ /**
+ * an array of evaluators
+ */
+ private ExprNodeEvaluator[] evaluators;
+
+ /**
+ * udaf evaluator partial
+ */
+ private GenericUDAFEvaluator udafPartial;
+
+ /**
+ * udaf evaluator complete
+ */
+ private GenericUDAFEvaluator udafComplete;
+
+ /**
+ * cached parameter objects
+ */
+ private Object[] cachedParameters;
+
+ /**
+ * cached row objects
+ */
+ private LazyObject<? extends ObjectInspector> cachedRowObject;
+
+ /**
+ * the output channel
+ */
+ private DataOutput out;
+
+ /**
+ * aggregation buffer
+ */
+ private AggregationBuffer aggBuffer;
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ private SerDe lazySer;
+
+ /**
+ * the output object inspector for this aggregation function
+ */
+ private ObjectInspector outputInspector;
+
+ /**
+ * the output object inspector for this aggregation function
+ */
+ private ObjectInspector outputInspectorPartial;
+
+ /**
+ * parameter inspectors
+ */
+ private ObjectInspector[] parameterInspectors;
+
+ /**
+ * output make sure the aggregation functio has least object creation
+ *
+ * @param desc
+ * @param oi
+ * @param output
+ */
+ public AggregationFunctionEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes, String genericUDAFName,
+ GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi, DataOutput output,
+ ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde,
+ LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial,
+ GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial) {
+ // shared object across threads
+ this.out = output;
+ this.mode = aggMode;
+ this.parameterInspectors = pInspectors;
+
+ // thread local objects
+ this.evaluators = evals;
+ this.cachedParameters = parameterCache;
+ this.cachedRowObject = row;
+ this.lazySer = serde;
+ this.udafPartial = udafunctionPartial;
+ this.udafComplete = udafunctionComplete;
+ this.outputInspector = outputOi;
+ this.outputInspectorPartial = outputOiPartial;
+ }
+
+ @Override
+ public void init() throws AlgebricksException {
+ try {
+ aggBuffer = udafPartial.getNewAggregationBuffer();
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ readIntoCache(tuple);
+ processRow();
+ }
+
+ private void processRow() throws AlgebricksException {
+ try {
+ // get values by evaluating them
+ for (int i = 0; i < cachedParameters.length; i++) {
+ cachedParameters[i] = evaluators[i].evaluate(cachedRowObject);
+ }
+ processAggregate();
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private void processAggregate() throws HiveException {
+ /**
+ * accumulate the aggregation function
+ */
+ switch (mode) {
+ case PARTIAL1:
+ case COMPLETE:
+ udafPartial.iterate(aggBuffer, cachedParameters);
+ break;
+ case PARTIAL2:
+ case FINAL:
+ if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) {
+ Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0])
+ .getPrimitiveWritableObject(cachedParameters[0]);
+ udafPartial.merge(aggBuffer, parameter);
+ } else
+ udafPartial.merge(aggBuffer, cachedParameters[0]);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * serialize the result
+ *
+ * @param result
+ * the evaluation result
+ * @throws IOException
+ * @throws AlgebricksException
+ */
+ private void serializeResult(Object result, ObjectInspector oi) throws IOException, AlgebricksException {
+ try {
+ BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi);
+ out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
+ } catch (SerDeException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ /**
+ * bind the tuple reference to the cached row object
+ *
+ * @param r
+ */
+ private void readIntoCache(IFrameTupleReference r) {
+ cachedRowObject.init(r);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ // aggregator
+ try {
+ Object result = null;
+ result = udafPartial.terminatePartial(aggBuffer);
+ if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) {
+ result = udafComplete.terminate(aggBuffer);
+ serializeResult(result, outputInspector);
+ } else {
+ serializeResult(result, outputInspectorPartial);
+ }
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ // aggregator.
+ try {
+ Object result = null;
+ // get aggregations
+ result = udafPartial.terminatePartial(aggBuffer);
+ serializeResult(result, outputInspectorPartial);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java
new file mode 100644
index 0000000..1933253
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/AggregatuibFunctionSerializableEvaluator.java
@@ -0,0 +1,248 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AggregatuibFunctionSerializableEvaluator implements ICopySerializableAggregateFunction {
+
+ /**
+ * the mode of aggregation function
+ */
+ private GenericUDAFEvaluator.Mode mode;
+
+ /**
+ * an array of evaluators
+ */
+ private ExprNodeEvaluator[] evaluators;
+
+ /**
+ * udaf evaluator partial
+ */
+ private GenericUDAFEvaluator udafPartial;
+
+ /**
+ * udaf evaluator complete
+ */
+ private GenericUDAFEvaluator udafComplete;
+
+ /**
+ * cached parameter objects
+ */
+ private Object[] cachedParameters;
+
+ /**
+ * cached row objects
+ */
+ private LazyObject<? extends ObjectInspector> cachedRowObject;
+
+ /**
+ * aggregation buffer
+ */
+ private SerializableBuffer aggBuffer;
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ private SerDe lazySer;
+
+ /**
+ * the output object inspector for this aggregation function
+ */
+ private ObjectInspector outputInspector;
+
+ /**
+ * the output object inspector for this aggregation function
+ */
+ private ObjectInspector outputInspectorPartial;
+
+ /**
+ * parameter inspectors
+ */
+ private ObjectInspector[] parameterInspectors;
+
+ /**
+ * output make sure the aggregation functio has least object creation
+ *
+ * @param desc
+ * @param oi
+ * @param output
+ */
+ public AggregatuibFunctionSerializableEvaluator(List<ExprNodeDesc> inputs, List<TypeInfo> inputTypes,
+ String genericUDAFName, GenericUDAFEvaluator.Mode aggMode, boolean distinct, ObjectInspector oi,
+ ExprNodeEvaluator[] evals, ObjectInspector[] pInspectors, Object[] parameterCache, SerDe serde,
+ LazyObject<? extends ObjectInspector> row, GenericUDAFEvaluator udafunctionPartial,
+ GenericUDAFEvaluator udafunctionComplete, ObjectInspector outputOi, ObjectInspector outputOiPartial)
+ throws AlgebricksException {
+ // shared object across threads
+ this.mode = aggMode;
+ this.parameterInspectors = pInspectors;
+
+ // thread local objects
+ this.evaluators = evals;
+ this.cachedParameters = parameterCache;
+ this.cachedRowObject = row;
+ this.lazySer = serde;
+ this.udafPartial = udafunctionPartial;
+ this.udafComplete = udafunctionComplete;
+ this.outputInspector = outputOi;
+ this.outputInspectorPartial = outputOiPartial;
+
+ try {
+ aggBuffer = (SerializableBuffer) udafPartial.getNewAggregationBuffer();
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void init(DataOutput output) throws AlgebricksException {
+ try {
+ udafPartial.reset(aggBuffer);
+ outputAggBuffer(aggBuffer, output);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException {
+ deSerializeAggBuffer(aggBuffer, data, start, len);
+ readIntoCache(tuple);
+ processRow();
+ serializeAggBuffer(aggBuffer, data, start, len);
+ }
+
+ private void processRow() throws AlgebricksException {
+ try {
+ // get values by evaluating them
+ for (int i = 0; i < cachedParameters.length; i++) {
+ cachedParameters[i] = evaluators[i].evaluate(cachedRowObject);
+ }
+ processAggregate();
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private void processAggregate() throws HiveException {
+ /**
+ * accumulate the aggregation function
+ */
+ switch (mode) {
+ case PARTIAL1:
+ case COMPLETE:
+ udafPartial.iterate(aggBuffer, cachedParameters);
+ break;
+ case PARTIAL2:
+ case FINAL:
+ if (udafPartial instanceof GenericUDAFCount.GenericUDAFCountEvaluator) {
+ Object parameter = ((PrimitiveObjectInspector) parameterInspectors[0])
+ .getPrimitiveWritableObject(cachedParameters[0]);
+ udafPartial.merge(aggBuffer, parameter);
+ } else
+ udafPartial.merge(aggBuffer, cachedParameters[0]);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * serialize the result
+ *
+ * @param result
+ * the evaluation result
+ * @throws IOException
+ * @throws AlgebricksException
+ */
+ private void serializeResult(Object result, ObjectInspector oi, DataOutput out) throws IOException,
+ AlgebricksException {
+ try {
+ BytesWritable outputWritable = (BytesWritable) lazySer.serialize(result, oi);
+ out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
+ } catch (SerDeException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ /**
+ * bind the tuple reference to the cached row object
+ *
+ * @param r
+ */
+ private void readIntoCache(IFrameTupleReference r) {
+ cachedRowObject.init(r);
+ }
+
+ @Override
+ public void finish(byte[] data, int start, int len, DataOutput output) throws AlgebricksException {
+ deSerializeAggBuffer(aggBuffer, data, start, len);
+ // aggregator
+ try {
+ Object result = null;
+ result = udafPartial.terminatePartial(aggBuffer);
+ if (mode == GenericUDAFEvaluator.Mode.COMPLETE || mode == GenericUDAFEvaluator.Mode.FINAL) {
+ result = udafComplete.terminate(aggBuffer);
+ serializeResult(result, outputInspector, output);
+ } else {
+ serializeResult(result, outputInspectorPartial, output);
+ }
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] data, int start, int len, DataOutput output) throws AlgebricksException {
+ deSerializeAggBuffer(aggBuffer, data, start, len);
+ // aggregator.
+ try {
+ Object result = null;
+ // get aggregations
+ result = udafPartial.terminatePartial(aggBuffer);
+ serializeResult(result, outputInspectorPartial, output);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private void serializeAggBuffer(SerializableBuffer buffer, byte[] data, int start, int len)
+ throws AlgebricksException {
+ buffer.serializeAggBuffer(data, start, len);
+ }
+
+ private void deSerializeAggBuffer(SerializableBuffer buffer, byte[] data, int start, int len)
+ throws AlgebricksException {
+ buffer.deSerializeAggBuffer(data, start, len);
+ }
+
+ private void outputAggBuffer(SerializableBuffer buffer, DataOutput out) throws AlgebricksException {
+ try {
+ buffer.serializeAggBuffer(out);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java
new file mode 100644
index 0000000..96065e5
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/BufferSerDeUtil.java
@@ -0,0 +1,67 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+public class BufferSerDeUtil {
+
+ public static double getDouble(byte[] bytes, int offset) {
+ return Double.longBitsToDouble(getLong(bytes, offset));
+ }
+
+ public static float getFloat(byte[] bytes, int offset) {
+ return Float.intBitsToFloat(getInt(bytes, offset));
+ }
+
+ public static boolean getBoolean(byte[] bytes, int offset) {
+ if (bytes[offset] == 0)
+ return false;
+ else
+ return true;
+ }
+
+ public static int getInt(byte[] bytes, int offset) {
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ + ((bytes[offset + 3] & 0xff) << 0);
+ }
+
+ public static long getLong(byte[] bytes, int offset) {
+ return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+ + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+ + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+ + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+ }
+
+ public static void writeBoolean(boolean value, byte[] bytes, int offset) {
+ if (value)
+ bytes[offset] = (byte) 1;
+ else
+ bytes[offset] = (byte) 0;
+ }
+
+ public static void writeInt(int value, byte[] bytes, int offset) {
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public static void writeLong(long value, byte[] bytes, int offset) {
+ bytes[offset++] = (byte) (value >> 56);
+ bytes[offset++] = (byte) (value >> 48);
+ bytes[offset++] = (byte) (value >> 40);
+ bytes[offset++] = (byte) (value >> 32);
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public static void writeDouble(double value, byte[] bytes, int offset) {
+ long lValue = Double.doubleToLongBits(value);
+ writeLong(lValue, bytes, offset);
+ }
+
+ public static void writeFloat(float value, byte[] bytes, int offset) {
+ int iValue = Float.floatToIntBits(value);
+ writeInt(iValue, bytes, offset);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java
new file mode 100644
index 0000000..5647f6a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ColumnExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class ColumnExpressionEvaluator extends AbstractExpressionEvaluator {
+
+ public ColumnExpressionEvaluator(ExprNodeColumnDesc expr, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(new ExprNodeColumnEvaluator(expr), oi, output);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java
new file mode 100644
index 0000000..d8796ea
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/ConstantExpressionEvaluator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class ConstantExpressionEvaluator extends AbstractExpressionEvaluator {
+
+ public ConstantExpressionEvaluator(ExprNodeConstantDesc expr, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(new ExprNodeConstantEvaluator(expr), oi, output);
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java
new file mode 100644
index 0000000..35560b6
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FieldExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeFieldEvaluator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class FieldExpressionEvaluator extends AbstractExpressionEvaluator {
+
+ public FieldExpressionEvaluator(ExprNodeFieldDesc expr, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(new ExprNodeFieldEvaluator(expr), oi, output);
+ }
+
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java
new file mode 100644
index 0000000..7ffec7a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/FunctionExpressionEvaluator.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class FunctionExpressionEvaluator extends AbstractExpressionEvaluator {
+
+ public FunctionExpressionEvaluator(ExprNodeGenericFuncDesc expr, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(new ExprNodeGenericFuncEvaluator(expr), oi, output);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java
new file mode 100644
index 0000000..ca60385
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/NullExpressionEvaluator.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeNullEvaluator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class NullExpressionEvaluator extends AbstractExpressionEvaluator {
+
+ public NullExpressionEvaluator(ExprNodeNullDesc expr, ObjectInspector oi, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(new ExprNodeNullEvaluator(expr), oi, output);
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java
new file mode 100644
index 0000000..676989e
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/SerializableBuffer.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public interface SerializableBuffer extends AggregationBuffer {
+
+ public void deSerializeAggBuffer(byte[] data, int start, int len);
+
+ public void serializeAggBuffer(byte[] data, int start, int len);
+
+ public void serializeAggBuffer(DataOutput output) throws IOException;
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java
new file mode 100644
index 0000000..2e78663
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/evaluator/UDTFFunctionEvaluator.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.hivesterix.runtime.evaluator;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+import org.apache.hadoop.hive.ql.udf.generic.Collector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hivesterix.serde.lazy.LazyColumnar;
+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class UDTFFunctionEvaluator implements ICopyUnnestingFunction, Collector {
+
+ /**
+ * udtf function
+ */
+ private UDTFDesc func;
+
+ /**
+ * input object inspector
+ */
+ private ObjectInspector inputInspector;
+
+ /**
+ * output object inspector
+ */
+ private ObjectInspector outputInspector;
+
+ /**
+ * object inspector for udtf
+ */
+ private ObjectInspector[] udtfInputOIs;
+
+ /**
+ * generic udtf
+ */
+ private GenericUDTF udtf;
+
+ /**
+ * data output
+ */
+ private DataOutput out;
+
+ /**
+ * the input row object
+ */
+ private LazyColumnar cachedRowObject;
+
+ /**
+ * cached row object (input)
+ */
+ private Object[] cachedInputObjects;
+
+ /**
+ * serialization/deserialization
+ */
+ private SerDe lazySerDe;
+
+ /**
+ * columns feed into UDTF
+ */
+ private int[] columns;
+
+ public UDTFFunctionEvaluator(UDTFDesc desc, Schema schema, int[] cols, DataOutput output) {
+ this.func = desc;
+ this.inputInspector = schema.toObjectInspector();
+ udtf = func.getGenericUDTF();
+ out = output;
+ columns = cols;
+ }
+
+ @Override
+ public void init(IFrameTupleReference tuple) throws AlgebricksException {
+ cachedInputObjects = new LazyObject[columns.length];
+ try {
+ cachedRowObject = (LazyColumnar) LazyFactory.createLazyObject(inputInspector);
+ outputInspector = udtf.initialize(udtfInputOIs);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ udtf.setCollector(this);
+ lazySerDe = new LazySerDe();
+ readIntoCache(tuple);
+ }
+
+ @Override
+ public boolean step() throws AlgebricksException {
+ try {
+ udtf.process(cachedInputObjects);
+ return true;
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ /**
+ * bind the tuple reference to the cached row object
+ *
+ * @param r
+ */
+ private void readIntoCache(IFrameTupleReference r) {
+ cachedRowObject.init(r);
+ for (int i = 0; i < cachedInputObjects.length; i++) {
+ cachedInputObjects[i] = cachedRowObject.getField(columns[i]);
+ }
+ }
+
+ /**
+ * serialize the result
+ *
+ * @param result
+ * the evaluation result
+ * @throws IOException
+ * @throws AlgebricksException
+ */
+ private void serializeResult(Object result) throws SerDeException, IOException {
+ BytesWritable outputWritable = (BytesWritable) lazySerDe.serialize(result, outputInspector);
+ out.write(outputWritable.getBytes(), 0, outputWritable.getLength());
+ }
+
+ @Override
+ public void collect(Object input) throws HiveException {
+ try {
+ serializeResult(input);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..f3b76e4
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryAscComparatorFactory.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveByteBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveByteBinaryAscComparatorFactory INSTANCE = new HiveByteBinaryAscComparatorFactory();
+
+ private HiveByteBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private byte left;
+ private byte right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = b1[s1];
+ right = b2[s2];
+ if (left > right)
+ return 1;
+ else if (left == right)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..8d452dc
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveByteBinaryDescComparatorFactory.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveByteBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveByteBinaryDescComparatorFactory INSTANCE = new HiveByteBinaryDescComparatorFactory();
+
+ private HiveByteBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private byte left;
+ private byte right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = b1[s1];
+ right = b2[s2];
+ if (left > right)
+ return -1;
+ else if (left == right)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..0b5350a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveDoubleBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveDoubleBinaryAscComparatorFactory INSTANCE = new HiveDoubleBinaryAscComparatorFactory();
+
+ private HiveDoubleBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private double left;
+ private double right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b1, s1));
+ right = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b2, s2));
+ if (left > right)
+ return 1;
+ else if (left == right)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..2405956
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveDoubleBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveDoubleBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveDoubleBinaryDescComparatorFactory INSTANCE = new HiveDoubleBinaryDescComparatorFactory();
+
+ private HiveDoubleBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private double left;
+ private double right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b1, s1));
+ right = Double.longBitsToDouble(LazyUtils.byteArrayToLong(b2, s2));
+ if (left > right)
+ return -1;
+ else if (left == right)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..05a43e6
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveFloatBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveFloatBinaryAscComparatorFactory INSTANCE = new HiveFloatBinaryAscComparatorFactory();
+
+ private HiveFloatBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private float left;
+ private float right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b1, s1));
+ right = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b2, s2));
+ if (left > right)
+ return 1;
+ else if (left == right)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..2c44f97
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveFloatBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveFloatBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveFloatBinaryDescComparatorFactory INSTANCE = new HiveFloatBinaryDescComparatorFactory();
+
+ private HiveFloatBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private float left;
+ private float right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b1, s1));
+ right = Float.intBitsToFloat(LazyUtils.byteArrayToInt(b2, s2));
+ if (left > right)
+ return -1;
+ else if (left == right)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..0127791
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryAscComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveIntegerBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final HiveIntegerBinaryAscComparatorFactory INSTANCE = new HiveIntegerBinaryAscComparatorFactory();
+
+ private HiveIntegerBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VInt left = new VInt();
+ private VInt right = new VInt();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVInt(b1, s1, left);
+ LazyUtils.readVInt(b2, s2, right);
+
+ if (left.length != l1 || right.length != l2)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + left.length + "," + right.length + " expected " + l1 + "," + l2);
+
+ if (left.value > right.value)
+ return 1;
+ else if (left.value == right.value)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..5116337
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveIntegerBinaryDescComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveIntegerBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final HiveIntegerBinaryDescComparatorFactory INSTANCE = new HiveIntegerBinaryDescComparatorFactory();
+
+ private HiveIntegerBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VInt left = new VInt();
+ private VInt right = new VInt();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVInt(b1, s1, left);
+ LazyUtils.readVInt(b2, s2, right);
+ if (left.length != l1 || right.length != l2)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + left.length + " expected " + l1);
+ if (left.value > right.value)
+ return -1;
+ else if (left.value == right.value)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..fa416a9
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryAscComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveLongBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final HiveLongBinaryAscComparatorFactory INSTANCE = new HiveLongBinaryAscComparatorFactory();
+
+ private HiveLongBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VLong left = new VLong();
+ private VLong right = new VLong();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVLong(b1, s1, left);
+ LazyUtils.readVLong(b2, s2, right);
+ if (left.length != l1 || right.length != l2)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + left.length + " expected " + l1);
+ if (left.value > right.value)
+ return 1;
+ else if (left.value == right.value)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..e72dc62
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveLongBinaryDescComparatorFactory.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveLongBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final HiveLongBinaryDescComparatorFactory INSTANCE = new HiveLongBinaryDescComparatorFactory();
+
+ private HiveLongBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VLong left = new VLong();
+ private VLong right = new VLong();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVLong(b1, s1, left);
+ LazyUtils.readVLong(b2, s2, right);
+ if (left.length != l1 || right.length != l2)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + left.length + " expected " + l1);
+ if (left.value > right.value)
+ return -1;
+ else if (left.value == right.value)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..a3745fa
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryAscComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveShortBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveShortBinaryAscComparatorFactory INSTANCE = new HiveShortBinaryAscComparatorFactory();
+
+ private HiveShortBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private short left;
+ private short right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = LazyUtils.byteArrayToShort(b1, s1);
+ right = LazyUtils.byteArrayToShort(b2, s2);
+ if (left > right)
+ return 1;
+ else if (left == right)
+ return 0;
+ else
+ return -1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..44d3f43
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveShortBinaryDescComparatorFactory.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveShortBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveShortBinaryDescComparatorFactory INSTANCE = new HiveShortBinaryDescComparatorFactory();
+
+ private HiveShortBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private short left;
+ private short right;
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ left = LazyUtils.byteArrayToShort(b1, s1);
+ right = LazyUtils.byteArrayToShort(b2, s2);
+ if (left > right)
+ return -1;
+ else if (left == right)
+ return 0;
+ else
+ return 1;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java
new file mode 100644
index 0000000..6da9716
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryAscComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveStringBinaryAscComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveStringBinaryAscComparatorFactory INSTANCE = new HiveStringBinaryAscComparatorFactory();
+
+ private HiveStringBinaryAscComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VInt leftLen = new VInt();
+ private VInt rightLen = new VInt();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVInt(b1, s1, leftLen);
+ LazyUtils.readVInt(b2, s2, rightLen);
+
+ if (leftLen.value + leftLen.length != l1 || rightLen.value + rightLen.length != l2)
+ throw new IllegalStateException("parse string: length mismatch, expected "
+ + (leftLen.value + leftLen.length) + ", " + (rightLen.value + rightLen.length)
+ + " but get " + l1 + ", " + l2);
+
+ return Text.Comparator.compareBytes(b1, s1 + leftLen.length, l1 - leftLen.length, b2, s2
+ + rightLen.length, l2 - rightLen.length);
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java
new file mode 100644
index 0000000..c579711
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/comparator/HiveStringBinaryDescComparatorFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.factory.comparator;
+
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveStringBinaryDescComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveStringBinaryDescComparatorFactory INSTANCE = new HiveStringBinaryDescComparatorFactory();
+
+ private HiveStringBinaryDescComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ private VInt leftLen = new VInt();
+ private VInt rightLen = new VInt();
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ LazyUtils.readVInt(b1, s1, leftLen);
+ LazyUtils.readVInt(b2, s2, rightLen);
+
+ if (leftLen.value + leftLen.length != l1 || rightLen.value + rightLen.length != l2)
+ throw new IllegalStateException("parse string: length mismatch, expected "
+ + (leftLen.value + leftLen.length) + ", " + (rightLen.value + rightLen.length)
+ + " but get " + l1 + ", " + l2);
+
+ return -WritableComparator.compareBytes(b1, s1 + leftLen.length, l1 - leftLen.length, b2, s2
+ + rightLen.length, l2 - rightLen.length);
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java
new file mode 100644
index 0000000..d664341
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionFactory.java
@@ -0,0 +1,367 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.AggregationFunctionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class AggregationFunctionFactory implements ICopyAggregateFunctionFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * list of parameters' serialization
+ */
+ private List<String> parametersSerialization = new ArrayList<String>();
+
+ /**
+ * the name of the udf
+ */
+ private String genericUDAFName;
+
+ /**
+ * aggregation mode
+ */
+ private GenericUDAFEvaluator.Mode mode;
+
+ /**
+ * list of type info
+ */
+ private List<TypeInfo> types = new ArrayList<TypeInfo>();
+
+ /**
+ * distinct or not
+ */
+ private boolean distinct;
+
+ /**
+ * the schema of incoming rows
+ */
+ private Schema rowSchema;
+
+ /**
+ * list of parameters
+ */
+ private transient List<ExprNodeDesc> parametersOrigin;
+
+ /**
+ * row inspector
+ */
+ private transient ObjectInspector rowInspector = null;
+
+ /**
+ * output object inspector
+ */
+ private transient ObjectInspector outputInspector = null;
+
+ /**
+ * output object inspector
+ */
+ private transient ObjectInspector outputInspectorPartial = null;
+
+ /**
+ * parameter inspectors
+ */
+ private transient ObjectInspector[] parameterInspectors = null;
+
+ /**
+ * expression desc
+ */
+ private transient HashMap<Long, List<ExprNodeDesc>> parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+ /**
+ * evaluators
+ */
+ private transient HashMap<Long, ExprNodeEvaluator[]> evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+ /**
+ * cached parameter objects
+ */
+ private transient HashMap<Long, Object[]> cachedParameters = new HashMap<Long, Object[]>();
+
+ /**
+ * cached row object: one per thread
+ */
+ private transient HashMap<Long, LazyObject<? extends ObjectInspector>> cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ private transient HashMap<Long, SerDe> serDe = new HashMap<Long, SerDe>();
+
+ /**
+ * udaf evaluators
+ */
+ private transient HashMap<Long, GenericUDAFEvaluator> udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * udaf evaluators
+ */
+ private transient HashMap<Long, GenericUDAFEvaluator> udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * aggregation function desc
+ */
+ private transient AggregationDesc aggregator;
+
+ /**
+ * @param aggregator
+ * Algebricks function call expression
+ * @param oi
+ * schema
+ */
+ public AggregationFunctionFactory(AggregateFunctionCallExpression expression, Schema oi,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+
+ try {
+ aggregator = (AggregationDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ }
+ init(aggregator.getParameters(), aggregator.getGenericUDAFName(), aggregator.getMode(),
+ aggregator.getDistinct(), oi);
+ }
+
+ /**
+ * constructor of aggregation function factory
+ *
+ * @param inputs
+ * @param name
+ * @param udafMode
+ * @param distinct
+ * @param oi
+ */
+ private void init(List<ExprNodeDesc> inputs, String name, GenericUDAFEvaluator.Mode udafMode, boolean distinct,
+ Schema oi) {
+ parametersOrigin = inputs;
+ genericUDAFName = name;
+ mode = udafMode;
+ this.distinct = distinct;
+ rowSchema = oi;
+
+ for (ExprNodeDesc input : inputs) {
+ TypeInfo type = input.getTypeInfo();
+ if (type instanceof StructTypeInfo) {
+ types.add(TypeInfoFactory.doubleTypeInfo);
+ } else
+ types.add(type);
+
+ String s = Utilities.serializeExpression(input);
+ parametersSerialization.add(s);
+ }
+ }
+
+ @Override
+ public synchronized ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)
+ throws AlgebricksException {
+ if (parametersOrigin == null) {
+ Configuration config = new Configuration();
+ config.setClassLoader(this.getClass().getClassLoader());
+ /**
+ * in case of class.forname(...) call in hive code
+ */
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ parametersOrigin = new ArrayList<ExprNodeDesc>();
+ for (String serialization : parametersSerialization) {
+ parametersOrigin.add(Utilities.deserializeExpression(serialization, config));
+ }
+ }
+
+ /**
+ * exprs
+ */
+ if (parameterExprs == null)
+ parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+ /**
+ * evaluators
+ */
+ if (evaluators == null)
+ evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+ /**
+ * cached parameter objects
+ */
+ if (cachedParameters == null)
+ cachedParameters = new HashMap<Long, Object[]>();
+
+ /**
+ * cached row object: one per thread
+ */
+ if (cachedRowObjects == null)
+ cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ if (serDe == null)
+ serDe = new HashMap<Long, SerDe>();
+
+ /**
+ * UDAF functions
+ */
+ if (udafsComplete == null)
+ udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * UDAF functions
+ */
+ if (udafsPartial == null)
+ udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+ if (parameterInspectors == null)
+ parameterInspectors = new ObjectInspector[parametersOrigin.size()];
+
+ if (rowInspector == null)
+ rowInspector = rowSchema.toObjectInspector();
+
+ // get current thread id
+ long threadId = Thread.currentThread().getId();
+
+ /**
+ * expressions, expressions are thread local
+ */
+ List<ExprNodeDesc> parameters = parameterExprs.get(threadId);
+ if (parameters == null) {
+ parameters = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc parameter : parametersOrigin)
+ parameters.add(parameter.clone());
+ parameterExprs.put(threadId, parameters);
+ }
+
+ /**
+ * cached parameter objects
+ */
+ Object[] cachedParas = cachedParameters.get(threadId);
+ if (cachedParas == null) {
+ cachedParas = new Object[parameters.size()];
+ cachedParameters.put(threadId, cachedParas);
+ }
+
+ /**
+ * cached row object: one per thread
+ */
+ LazyObject<? extends ObjectInspector> cachedRowObject = cachedRowObjects.get(threadId);
+ if (cachedRowObject == null) {
+ cachedRowObject = LazyFactory.createLazyObject(rowInspector);
+ cachedRowObjects.put(threadId, cachedRowObject);
+ }
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ SerDe lazySer = serDe.get(threadId);
+ if (lazySer == null) {
+ lazySer = new LazySerDe();
+ serDe.put(threadId, lazySer);
+ }
+
+ /**
+ * evaluators
+ */
+ ExprNodeEvaluator[] evals = evaluators.get(threadId);
+ if (evals == null) {
+ evals = new ExprNodeEvaluator[parameters.size()];
+ evaluators.put(threadId, evals);
+ }
+
+ GenericUDAFEvaluator udafPartial;
+ GenericUDAFEvaluator udafComplete;
+
+ // initialize object inspectors
+ try {
+ /**
+ * evaluators, udf, object inpsectors are shared in one thread
+ */
+ for (int i = 0; i < evals.length; i++) {
+ if (evals[i] == null) {
+ evals[i] = ExprNodeEvaluatorFactory.get(parameters.get(i));
+ if (parameterInspectors[i] == null) {
+ parameterInspectors[i] = evals[i].initialize(rowInspector);
+ } else {
+ evals[i].initialize(rowInspector);
+ }
+ }
+ }
+
+ udafComplete = udafsComplete.get(threadId);
+ if (udafComplete == null) {
+ try {
+ udafComplete = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ udafsComplete.put(threadId, udafComplete);
+ udafComplete.init(mode, parameterInspectors);
+ }
+
+ // multiple stage group by, determined by the mode parameter
+ if (outputInspector == null)
+ outputInspector = udafComplete.init(mode, parameterInspectors);
+
+ // initial partial gby udaf
+ GenericUDAFEvaluator.Mode partialMode;
+ // adjust mode for external groupby
+ if (mode == GenericUDAFEvaluator.Mode.COMPLETE)
+ partialMode = GenericUDAFEvaluator.Mode.PARTIAL1;
+ else if (mode == GenericUDAFEvaluator.Mode.FINAL)
+ partialMode = GenericUDAFEvaluator.Mode.PARTIAL2;
+ else
+ partialMode = mode;
+ udafPartial = udafsPartial.get(threadId);
+ if (udafPartial == null) {
+ try {
+ udafPartial = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ udafPartial.init(partialMode, parameterInspectors);
+ udafsPartial.put(threadId, udafPartial);
+ }
+
+ // multiple stage group by, determined by the mode parameter
+ if (outputInspectorPartial == null)
+ outputInspectorPartial = udafPartial.init(partialMode, parameterInspectors);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e);
+ }
+
+ return new AggregationFunctionEvaluator(parameters, types, genericUDAFName, mode, distinct, rowInspector,
+ provider.getDataOutput(), evals, parameterInspectors, cachedParas, lazySer, cachedRowObject,
+ udafPartial, udafComplete, outputInspector, outputInspectorPartial);
+ }
+
+ public String toString() {
+ return "aggregation function expression evaluator factory: " + this.genericUDAFName;
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java
new file mode 100644
index 0000000..54a1155
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/AggregationFunctionSerializableFactory.java
@@ -0,0 +1,366 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.AggregatuibFunctionSerializableEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hivesterix.serde.lazy.LazyFactory;
+import edu.uci.ics.hivesterix.serde.lazy.LazyObject;
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class AggregationFunctionSerializableFactory implements ICopySerializableAggregateFunctionFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * list of parameters' serialization
+ */
+ private List<String> parametersSerialization = new ArrayList<String>();
+
+ /**
+ * the name of the udf
+ */
+ private String genericUDAFName;
+
+ /**
+ * aggregation mode
+ */
+ private GenericUDAFEvaluator.Mode mode;
+
+ /**
+ * list of type info
+ */
+ private List<TypeInfo> types = new ArrayList<TypeInfo>();
+
+ /**
+ * distinct or not
+ */
+ private boolean distinct;
+
+ /**
+ * the schema of incoming rows
+ */
+ private Schema rowSchema;
+
+ /**
+ * list of parameters
+ */
+ private transient List<ExprNodeDesc> parametersOrigin;
+
+ /**
+ * row inspector
+ */
+ private transient ObjectInspector rowInspector = null;
+
+ /**
+ * output object inspector
+ */
+ private transient ObjectInspector outputInspector = null;
+
+ /**
+ * output object inspector
+ */
+ private transient ObjectInspector outputInspectorPartial = null;
+
+ /**
+ * parameter inspectors
+ */
+ private transient ObjectInspector[] parameterInspectors = null;
+
+ /**
+ * expression desc
+ */
+ private transient HashMap<Long, List<ExprNodeDesc>> parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+ /**
+ * evaluators
+ */
+ private transient HashMap<Long, ExprNodeEvaluator[]> evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+ /**
+ * cached parameter objects
+ */
+ private transient HashMap<Long, Object[]> cachedParameters = new HashMap<Long, Object[]>();
+
+ /**
+ * cached row object: one per thread
+ */
+ private transient HashMap<Long, LazyObject<? extends ObjectInspector>> cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ private transient HashMap<Long, SerDe> serDe = new HashMap<Long, SerDe>();
+
+ /**
+ * udaf evaluators
+ */
+ private transient HashMap<Long, GenericUDAFEvaluator> udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * udaf evaluators
+ */
+ private transient HashMap<Long, GenericUDAFEvaluator> udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * aggregation function desc
+ */
+ private transient AggregationDesc aggregator;
+
+ /**
+ * @param aggregator
+ * Algebricks function call expression
+ * @param oi
+ * schema
+ */
+ public AggregationFunctionSerializableFactory(AggregateFunctionCallExpression expression, Schema oi,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+
+ try {
+ aggregator = (AggregationDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ }
+ init(aggregator.getParameters(), aggregator.getGenericUDAFName(), aggregator.getMode(),
+ aggregator.getDistinct(), oi);
+ }
+
+ /**
+ * constructor of aggregation function factory
+ *
+ * @param inputs
+ * @param name
+ * @param udafMode
+ * @param distinct
+ * @param oi
+ */
+ private void init(List<ExprNodeDesc> inputs, String name, GenericUDAFEvaluator.Mode udafMode, boolean distinct,
+ Schema oi) {
+ parametersOrigin = inputs;
+ genericUDAFName = name;
+ mode = udafMode;
+ this.distinct = distinct;
+ rowSchema = oi;
+
+ for (ExprNodeDesc input : inputs) {
+ TypeInfo type = input.getTypeInfo();
+ if (type instanceof StructTypeInfo) {
+ types.add(TypeInfoFactory.doubleTypeInfo);
+ } else
+ types.add(type);
+
+ String s = Utilities.serializeExpression(input);
+ parametersSerialization.add(s);
+ }
+ }
+
+ @Override
+ public synchronized ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ if (parametersOrigin == null) {
+ Configuration config = new Configuration();
+ config.setClassLoader(this.getClass().getClassLoader());
+ /**
+ * in case of class.forname(...) call in hive code
+ */
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ parametersOrigin = new ArrayList<ExprNodeDesc>();
+ for (String serialization : parametersSerialization) {
+ parametersOrigin.add(Utilities.deserializeExpression(serialization, config));
+ }
+ }
+
+ /**
+ * exprs
+ */
+ if (parameterExprs == null)
+ parameterExprs = new HashMap<Long, List<ExprNodeDesc>>();
+
+ /**
+ * evaluators
+ */
+ if (evaluators == null)
+ evaluators = new HashMap<Long, ExprNodeEvaluator[]>();
+
+ /**
+ * cached parameter objects
+ */
+ if (cachedParameters == null)
+ cachedParameters = new HashMap<Long, Object[]>();
+
+ /**
+ * cached row object: one per thread
+ */
+ if (cachedRowObjects == null)
+ cachedRowObjects = new HashMap<Long, LazyObject<? extends ObjectInspector>>();
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ if (serDe == null)
+ serDe = new HashMap<Long, SerDe>();
+
+ /**
+ * UDAF functions
+ */
+ if (udafsComplete == null)
+ udafsComplete = new HashMap<Long, GenericUDAFEvaluator>();
+
+ /**
+ * UDAF functions
+ */
+ if (udafsPartial == null)
+ udafsPartial = new HashMap<Long, GenericUDAFEvaluator>();
+
+ if (parameterInspectors == null)
+ parameterInspectors = new ObjectInspector[parametersOrigin.size()];
+
+ if (rowInspector == null)
+ rowInspector = rowSchema.toObjectInspector();
+
+ // get current thread id
+ long threadId = Thread.currentThread().getId();
+
+ /**
+ * expressions, expressions are thread local
+ */
+ List<ExprNodeDesc> parameters = parameterExprs.get(threadId);
+ if (parameters == null) {
+ parameters = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc parameter : parametersOrigin)
+ parameters.add(parameter.clone());
+ parameterExprs.put(threadId, parameters);
+ }
+
+ /**
+ * cached parameter objects
+ */
+ Object[] cachedParas = cachedParameters.get(threadId);
+ if (cachedParas == null) {
+ cachedParas = new Object[parameters.size()];
+ cachedParameters.put(threadId, cachedParas);
+ }
+
+ /**
+ * cached row object: one per thread
+ */
+ LazyObject<? extends ObjectInspector> cachedRowObject = cachedRowObjects.get(threadId);
+ if (cachedRowObject == null) {
+ cachedRowObject = LazyFactory.createLazyObject(rowInspector);
+ cachedRowObjects.put(threadId, cachedRowObject);
+ }
+
+ /**
+ * we only use lazy serde to do serialization
+ */
+ SerDe lazySer = serDe.get(threadId);
+ if (lazySer == null) {
+ lazySer = new LazySerDe();
+ serDe.put(threadId, lazySer);
+ }
+
+ /**
+ * evaluators
+ */
+ ExprNodeEvaluator[] evals = evaluators.get(threadId);
+ if (evals == null) {
+ evals = new ExprNodeEvaluator[parameters.size()];
+ evaluators.put(threadId, evals);
+ }
+
+ GenericUDAFEvaluator udafPartial;
+ GenericUDAFEvaluator udafComplete;
+
+ // initialize object inspectors
+ try {
+ /**
+ * evaluators, udf, object inpsectors are shared in one thread
+ */
+ for (int i = 0; i < evals.length; i++) {
+ if (evals[i] == null) {
+ evals[i] = ExprNodeEvaluatorFactory.get(parameters.get(i));
+ if (parameterInspectors[i] == null) {
+ parameterInspectors[i] = evals[i].initialize(rowInspector);
+ } else {
+ evals[i].initialize(rowInspector);
+ }
+ }
+ }
+
+ udafComplete = udafsComplete.get(threadId);
+ if (udafComplete == null) {
+ try {
+ udafComplete = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ udafsComplete.put(threadId, udafComplete);
+ udafComplete.init(mode, parameterInspectors);
+ }
+
+ // multiple stage group by, determined by the mode parameter
+ if (outputInspector == null)
+ outputInspector = udafComplete.init(mode, parameterInspectors);
+
+ // initial partial gby udaf
+ GenericUDAFEvaluator.Mode partialMode;
+ // adjust mode for external groupby
+ if (mode == GenericUDAFEvaluator.Mode.COMPLETE)
+ partialMode = GenericUDAFEvaluator.Mode.PARTIAL1;
+ else if (mode == GenericUDAFEvaluator.Mode.FINAL)
+ partialMode = GenericUDAFEvaluator.Mode.PARTIAL2;
+ else
+ partialMode = mode;
+ udafPartial = udafsPartial.get(threadId);
+ if (udafPartial == null) {
+ try {
+ udafPartial = FunctionRegistry.getGenericUDAFEvaluator(genericUDAFName, types, distinct, false);
+ } catch (HiveException e) {
+ throw new AlgebricksException(e);
+ }
+ udafPartial.init(partialMode, parameterInspectors);
+ udafsPartial.put(threadId, udafPartial);
+ }
+
+ // multiple stage group by, determined by the mode parameter
+ if (outputInspectorPartial == null)
+ outputInspectorPartial = udafPartial.init(partialMode, parameterInspectors);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e);
+ }
+
+ return new AggregatuibFunctionSerializableEvaluator(parameters, types, genericUDAFName, mode, distinct,
+ rowInspector, evals, parameterInspectors, cachedParas, lazySer, cachedRowObject, udafPartial,
+ udafComplete, outputInspector, outputInspectorPartial);
+ }
+
+ public String toString() {
+ return "aggregation function expression evaluator factory: " + this.genericUDAFName;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..6f51bfe
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ColumnExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.ColumnExpressionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class ColumnExpressionEvaluatorFactory implements ICopyEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private ExprNodeColumnDesc expr;
+
+ private Schema inputSchema;
+
+ public ColumnExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)
+ throws AlgebricksException {
+ try {
+ expr = (ExprNodeColumnDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ throw new AlgebricksException(e.getMessage());
+ }
+ inputSchema = schema;
+ }
+
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ return new ColumnExpressionEvaluator(expr, inputSchema.toObjectInspector(), output);
+ }
+
+ public String toString() {
+ return "column expression evaluator factory: " + expr.toString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..4ecdb70
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ConstantExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.ConstantExpressionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class ConstantExpressionEvaluatorFactory implements ICopyEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private ExprNodeConstantDesc expr;
+
+ private Schema schema;
+
+ public ConstantExpressionEvaluatorFactory(ILogicalExpression expression, Schema inputSchema,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+ try {
+ expr = (ExprNodeConstantDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ throw new AlgebricksException(e.getMessage());
+ }
+ schema = inputSchema;
+ }
+
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ return new ConstantExpressionEvaluator(expr, schema.toObjectInspector(), output);
+ }
+
+ public String toString() {
+ return "constant expression evaluator factory: " + expr.toString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..ef0c104
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/FieldExpressionEvaluatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.FieldExpressionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class FieldExpressionEvaluatorFactory implements ICopyEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private ExprNodeFieldDesc expr;
+
+ private Schema inputSchema;
+
+ public FieldExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)
+ throws AlgebricksException {
+ try {
+ expr = (ExprNodeFieldDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ throw new AlgebricksException(e.getMessage());
+ }
+ inputSchema = schema;
+ }
+
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ return new FieldExpressionEvaluator(expr, inputSchema.toObjectInspector(), output);
+ }
+
+ public String toString() {
+ return "field access expression evaluator factory: " + expr.toString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java
new file mode 100644
index 0000000..c3b4b17
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/HiveExpressionRuntimeProvider.java
@@ -0,0 +1,167 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.AggregateFunctionFactoryAdapter;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.UnnestingFunctionFactoryAdapter;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+
+public class HiveExpressionRuntimeProvider implements IExpressionRuntimeProvider {
+
+ public static final IExpressionRuntimeProvider INSTANCE = new HiveExpressionRuntimeProvider();
+
+ @Override
+ public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new AggregateFunctionFactoryAdapter(new AggregationFunctionFactory(expr, schema, env));
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
+ JobGenContext context) throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new AggregationFunctionSerializableFactory(expr, schema, env);
+ }
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new UnnestingFunctionFactoryAdapter(new UnnestingFunctionFactory(expr, schema, env));
+ }
+
+ public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ switch (expr.getExpressionTag()) {
+ case VARIABLE: {
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ return new ScalarEvaluatorFactoryAdapter(createVariableEvaluatorFactory(v, env, inputSchemas, context));
+ }
+ case CONSTANT: {
+ ConstantExpression c = (ConstantExpression) expr;
+ return new ScalarEvaluatorFactoryAdapter(createConstantEvaluatorFactory(c, env, inputSchemas, context));
+ }
+ case FUNCTION_CALL: {
+ AbstractFunctionCallExpression fun = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier fid = fun.getFunctionIdentifier();
+
+ if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) {
+ return new ScalarEvaluatorFactoryAdapter(createFieldExpressionEvaluatorFactory(fun, env,
+ inputSchemas, context));
+ }
+
+ if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) {
+ return new ScalarEvaluatorFactoryAdapter(createNullExpressionEvaluatorFactory(fun, env,
+ inputSchemas, context));
+ }
+
+ if (fun.getKind() == FunctionKind.SCALAR) {
+ ScalarFunctionCallExpression scalar = (ScalarFunctionCallExpression) fun;
+ return new ScalarEvaluatorFactoryAdapter(createScalarFunctionEvaluatorFactory(scalar, env,
+ inputSchemas, context));
+ } else {
+ throw new AlgebricksException("Cannot create evaluator for function " + fun + " of kind "
+ + fun.getKind());
+ }
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private ICopyEvaluatorFactory createVariableEvaluatorFactory(VariableReferenceExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new ColumnExpressionEvaluatorFactory(expr, schema, env);
+ }
+
+ private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ List<String> names = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ for (IOperatorSchema inputSchema : inputSchemas) {
+ Schema schema = this.getSchema(inputSchema, env);
+ names.addAll(schema.getNames());
+ types.addAll(schema.getTypes());
+ }
+ Schema inputSchema = new Schema(names, types);
+ return new ScalarFunctionExpressionEvaluatorFactory(expr, inputSchema, env);
+ }
+
+ private ICopyEvaluatorFactory createFieldExpressionEvaluatorFactory(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new FieldExpressionEvaluatorFactory(expr, schema, env);
+ }
+
+ private ICopyEvaluatorFactory createNullExpressionEvaluatorFactory(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new NullExpressionEvaluatorFactory(expr, schema, env);
+ }
+
+ private ICopyEvaluatorFactory createConstantEvaluatorFactory(ConstantExpression expr, IVariableTypeEnvironment env,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ Schema schema = this.getSchema(inputSchemas[0], env);
+ return new ConstantExpressionEvaluatorFactory(expr, schema, env);
+ }
+
+ private Schema getSchema(IOperatorSchema inputSchema, IVariableTypeEnvironment env) throws AlgebricksException {
+ List<String> names = new ArrayList<String>();
+ List<TypeInfo> types = new ArrayList<TypeInfo>();
+ Iterator<LogicalVariable> variables = inputSchema.iterator();
+ while (variables.hasNext()) {
+ LogicalVariable var = variables.next();
+ names.add(var.toString());
+ types.add((TypeInfo) env.getVarType(var));
+ }
+
+ Schema schema = new Schema(names, types);
+ return schema;
+ }
+
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..f37b825
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/NullExpressionEvaluatorFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.NullExpressionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class NullExpressionEvaluatorFactory implements ICopyEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private ExprNodeNullDesc expr;
+
+ private Schema schema;
+
+ public NullExpressionEvaluatorFactory(ILogicalExpression expression, Schema intputSchema,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+ try {
+ expr = (ExprNodeNullDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ throw new AlgebricksException(e.getMessage());
+ }
+ schema = intputSchema;
+ }
+
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ return new NullExpressionEvaluator(expr, schema.toObjectInspector(), output);
+ }
+
+ public String toString() {
+ return "null expression evaluator factory: " + expr.toString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java
new file mode 100644
index 0000000..cbac10a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/ScalarFunctionExpressionEvaluatorFactory.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.FunctionExpressionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class ScalarFunctionExpressionEvaluatorFactory implements ICopyEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient ExprNodeGenericFuncDesc expr;
+
+ private String exprSerialization;
+
+ private Schema inputSchema;
+
+ private transient Configuration config;
+
+ public ScalarFunctionExpressionEvaluatorFactory(ILogicalExpression expression, Schema schema,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+ try {
+ expr = (ExprNodeGenericFuncDesc) ExpressionTranslator.getHiveExpression(expression, env);
+
+ exprSerialization = Utilities.serializeExpression(expr);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(e.getMessage());
+ }
+ inputSchema = schema;
+ }
+
+ public synchronized ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+ if (expr == null) {
+ configClassLoader();
+ expr = (ExprNodeGenericFuncDesc) Utilities.deserializeExpression(exprSerialization, config);
+ }
+
+ ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr.clone();
+ return new FunctionExpressionEvaluator(funcDesc, inputSchema.toObjectInspector(), output);
+ }
+
+ private void configClassLoader() {
+ config = new Configuration();
+ ClassLoader loader = this.getClass().getClassLoader();
+ config.setClassLoader(loader);
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+
+ public String toString() {
+ if (expr == null) {
+ configClassLoader();
+ expr = (ExprNodeGenericFuncDesc) Utilities.deserializeExpression(exprSerialization, new Configuration());
+ }
+
+ return "function expression evaluator factory: " + expr.getExprString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java
new file mode 100644
index 0000000..3b22513
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/evaluator/UnnestingFunctionFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.evaluator;
+
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.ExpressionTranslator;
+import edu.uci.ics.hivesterix.runtime.evaluator.UDTFFunctionEvaluator;
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class UnnestingFunctionFactory implements ICopyUnnestingFunctionFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private UDTFDesc expr;
+
+ private Schema inputSchema;
+
+ private int[] columns;
+
+ public UnnestingFunctionFactory(ILogicalExpression expression, Schema schema, IVariableTypeEnvironment env)
+ throws AlgebricksException {
+ try {
+ expr = (UDTFDesc) ExpressionTranslator.getHiveExpression(expression, env);
+ } catch (Exception e) {
+ throw new AlgebricksException(e.getMessage());
+ }
+ inputSchema = schema;
+ }
+
+ @Override
+ public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException {
+ return new UDTFFunctionEvaluator(expr, inputSchema, columns, provider.getDataOutput());
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..b636009
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveDoubleBinaryHashFunctionFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveDoubleBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveDoubleBinaryHashFunctionFactory INSTANCE = new HiveDoubleBinaryHashFunctionFactory();
+
+ private HiveDoubleBinaryHashFunctionFactory() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ // TODO Auto-generated method stub
+ return new IBinaryHashFunction() {
+ private Double value;
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ value = Double.longBitsToDouble(LazyUtils.byteArrayToLong(bytes, offset));
+ return value.hashCode();
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..90e6ce4
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveIntegerBinaryHashFunctionFactory.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveIntegerBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static IBinaryHashFunctionFactory INSTANCE = new HiveIntegerBinaryHashFunctionFactory();
+
+ private HiveIntegerBinaryHashFunctionFactory() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+
+ return new IBinaryHashFunction() {
+ private VInt value = new VInt();
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ LazyUtils.readVInt(bytes, offset, value);
+ if (value.length != length)
+ throw new IllegalArgumentException("length mismatch in int hash function actual: " + length
+ + " expected " + value.length);
+ return value.value;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..1b61f67
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveLongBinaryHashFunctionFactory.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveLongBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static IBinaryHashFunctionFactory INSTANCE = new HiveLongBinaryHashFunctionFactory();
+
+ private HiveLongBinaryHashFunctionFactory() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+
+ return new IBinaryHashFunction() {
+ private VLong value = new VLong();
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ LazyUtils.readVLong(bytes, offset, value);
+ return (int) value.value;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..f2b7b44
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveRawBinaryHashFunctionFactory.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveRawBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static IBinaryHashFunctionFactory INSTANCE = new HiveRawBinaryHashFunctionFactory();
+
+ private HiveRawBinaryHashFunctionFactory() {
+
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+
+ return new IBinaryHashFunction() {
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int value = 1;
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ value = value * 31 + (int) bytes[i];
+ return value;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..a9cf6fd
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/HiveStingBinaryHashFunctionFactory.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveStingBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static HiveStingBinaryHashFunctionFactory INSTANCE = new HiveStingBinaryHashFunctionFactory();
+
+ private HiveStingBinaryHashFunctionFactory() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ // TODO Auto-generated method stub
+ return new IBinaryHashFunction() {
+ private VInt len = new VInt();
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ LazyUtils.readVInt(bytes, offset, len);
+ if (len.value + len.length != length)
+ throw new IllegalStateException("parse string: length mismatch, expected "
+ + (len.value + len.length) + " but get " + length);
+ return hashBytes(bytes, offset + len.length, length - len.length);
+ }
+
+ public int hashBytes(byte[] bytes, int offset, int length) {
+ int value = 1;
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ value = value * 31 + (int) bytes[i];
+ return value;
+ }
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..760a614
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/hashfunction/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.hashfunction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+
+ public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
+
+ private static final long serialVersionUID = 1L;
+
+ private MurmurHash3BinaryHashFunctionFamily() {
+ }
+
+ private static final int C1 = 0xcc9e2d51;
+ private static final int C2 = 0x1b873593;
+ private static final int C3 = 5;
+ private static final int C4 = 0xe6546b64;
+ private static final int C5 = 0x85ebca6b;
+ private static final int C6 = 0xc2b2ae35;
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = seed;
+ int p = offset;
+ int remain = length;
+ while (remain >= 4) {
+ int k = (bytes[p] & 0xff) | ((bytes[p + 1] & 0xff) << 8) | ((bytes[p + 2] & 0xff) << 16)
+ | ((bytes[p + 3] & 0xff) << 24);
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ h = Integer.rotateLeft(h, 13);
+ h = h * C3 + C4;
+ p += 4;
+ remain -= 4;
+ }
+ if (remain > 0) {
+ int k = 0;
+ for (int i = 0; remain > 0; i += 8) {
+ k ^= (bytes[p++] & 0xff) << i;
+ remain--;
+ }
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ }
+ h ^= length;
+ h ^= (h >>> 16);
+ h *= C5;
+ h ^= (h >>> 13);
+ h *= C6;
+ h ^= (h >>> 16);
+ return h;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..6ac012f
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveDoubleAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int header = LazyUtils.byteArrayToInt(bytes, start);
+ long unsignedValue = (long) header;
+ return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..3044109
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveDoubleDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveDoubleDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new HiveDoubleAscNormalizedKeyComputerFactory();
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int nk = nmkComputer.normalize(bytes, start, length);
+ return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+ }
+
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..a1d4d48
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveIntegerAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+ return new INormalizedKeyComputer() {
+ private VInt vint = new VInt();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ LazyUtils.readVInt(bytes, start, vint);
+ if (vint.length != length)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + vint.length + " expected " + length);
+ long unsignedValue = (long) vint.value;
+ return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..b8a30a8
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveIntegerDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveIntegerDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+ return new INormalizedKeyComputer() {
+ private VInt vint = new VInt();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ LazyUtils.readVInt(bytes, start, vint);
+ if (vint.length != length)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + vint.length + " expected " + length);
+ long unsignedValue = (long) vint.value;
+ return (int) ((long) 0xffffffff - unsignedValue);
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..a893d19
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,63 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VLong;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveLongAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ private VLong vlong = new VLong();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ LazyUtils.readVLong(bytes, start, vlong);
+ if (vlong.length != length)
+ throw new IllegalArgumentException("length mismatch in int comparator function actual: "
+ + vlong.length + " expected " + length);
+ long value = (long) vlong.value;
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0; TODO: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ long unsignedFirstValue = (long) value;
+ int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ return nmk;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..cc5661b
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveLongDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveLongDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new HiveIntegerAscNormalizedKeyComputerFactory();
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int nk = nmkComputer.normalize(bytes, start, length);
+ return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
+ }
+
+ };
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d0429d6
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringAscNormalizedKeyComputerFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class HiveStringAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+
+ return new INormalizedKeyComputer() {
+ private VInt len = new VInt();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ LazyUtils.readVInt(bytes, start, len);
+
+ if (len.value + len.length != length)
+ throw new IllegalStateException("parse string: length mismatch, expected "
+ + (len.value + len.length) + " but get " + length);
+ int nk = 0;
+ int offset = start + len.length;
+ for (int i = 0; i < 2; ++i) {
+ nk <<= 16;
+ if (i < len.value) {
+ char character = UTF8StringPointable.charAt(bytes, offset);
+ nk += ((int) character) & 0xffff;
+ offset += UTF8StringPointable.charSize(bytes, offset);
+ }
+ }
+ return nk;
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..15b2d27
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/normalize/HiveStringDescNormalizedKeyComputerFactory.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.hivesterix.runtime.factory.normalize;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class HiveStringDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private VInt len = new VInt();
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ LazyUtils.readVInt(bytes, start, len);
+ if (len.value + len.length != length)
+ throw new IllegalStateException("parse string: length mismatch, expected "
+ + (len.value + len.length) + " but get " + length);
+ int nk = 0;
+ int offset = start + len.length;
+ for (int i = 0; i < 2; ++i) {
+ nk <<= 16;
+ if (i < len.value) {
+ nk += ((int) UTF8StringPointable.charAt(bytes, offset)) & 0xffff;
+ offset += UTF8StringPointable.charSize(bytes, offset);
+ }
+ }
+ return (int) ((long) 0xffffffff - (long) nk);
+ }
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java
new file mode 100644
index 0000000..590bd61
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/factory/nullwriter/HiveNullWriterFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hivesterix.runtime.factory.nullwriter;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class HiveNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static HiveNullWriterFactory INSTANCE = new HiveNullWriterFactory();
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new HiveNullWriter();
+ }
+}
+
+class HiveNullWriter implements INullWriter {
+
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ // do nothing
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java
new file mode 100644
index 0000000..677e20e
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspector.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+
+public class HiveBinaryBooleanInspector implements IBinaryBooleanInspector {
+
+ HiveBinaryBooleanInspector() {
+ }
+
+ @Override
+ public boolean getBooleanValue(byte[] bytes, int offset, int length) {
+ if (length == 0)
+ return false;
+ if (length != 1)
+ throw new IllegalStateException("boolean field error: with length " + length);
+ return bytes[0] == 1;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java
new file mode 100644
index 0000000..22a6065
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryBooleanInspectorFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveBinaryBooleanInspectorFactory implements IBinaryBooleanInspectorFactory {
+ private static final long serialVersionUID = 1L;
+ public static HiveBinaryBooleanInspectorFactory INSTANCE = new HiveBinaryBooleanInspectorFactory();
+
+ private HiveBinaryBooleanInspectorFactory() {
+
+ }
+
+ @Override
+ public IBinaryBooleanInspector createBinaryBooleanInspector(IHyracksTaskContext arg0) {
+ return new HiveBinaryBooleanInspector();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java
new file mode 100644
index 0000000..555afee
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspector.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils.VInt;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+
+public class HiveBinaryIntegerInspector implements IBinaryIntegerInspector {
+ private VInt value = new VInt();
+
+ HiveBinaryIntegerInspector() {
+ }
+
+ @Override
+ public int getIntegerValue(byte[] bytes, int offset, int length) {
+ LazyUtils.readVInt(bytes, offset, value);
+ if (value.length != length)
+ throw new IllegalArgumentException("length mismatch in int hash function actual: " + length + " expected "
+ + value.length);
+ return value.value;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java
new file mode 100644
index 0000000..bb93a60
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/inspector/HiveBinaryIntegerInspectorFactory.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.inspector;
+
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveBinaryIntegerInspectorFactory implements IBinaryIntegerInspectorFactory {
+ private static final long serialVersionUID = 1L;
+ public static HiveBinaryIntegerInspectorFactory INSTANCE = new HiveBinaryIntegerInspectorFactory();
+
+ private HiveBinaryIntegerInspectorFactory() {
+
+ }
+
+ @Override
+ public IBinaryIntegerInspector createBinaryIntegerInspector(IHyracksTaskContext arg0) {
+ return new HiveBinaryIntegerInspector();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..cfceb26
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class HiveConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ public enum Policy {
+ PIPELINING,
+ SEND_SIDE_MAT_PIPELINING,
+ SEND_SIDE_MAT_BLOCKING,
+ SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING;
+ };
+
+ private static final long serialVersionUID = 1L;
+
+ private final IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+ private final IConnectorPolicy sendSideMatPipeliningPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private final IConnectorPolicy sendSideMatBlockingPolicy = new SendSideMaterializedBlockingConnectorPolicy();
+ private final IConnectorPolicy sendSideMatReceiveSideMatBlockingPolicy = new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+ private final Policy policy;
+
+ public HiveConnectorPolicyAssignmentPolicy(Policy policy) {
+ this.policy = policy;
+ }
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ // avoid deadlocks
+ switch (policy) {
+ case PIPELINING:
+ case SEND_SIDE_MAT_PIPELINING:
+ return sendSideMatPipeliningPolicy;
+ case SEND_SIDE_MAT_BLOCKING:
+ return sendSideMatBlockingPolicy;
+ case SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING:
+ return sendSideMatReceiveSideMatBlockingPolicy;
+ default:
+ return sendSideMatPipeliningPolicy;
+ }
+ } else if (c instanceof MToNPartitioningConnectorDescriptor) {
+ // support different repartitioning policies
+ switch (policy) {
+ case PIPELINING:
+ return pipeliningPolicy;
+ case SEND_SIDE_MAT_PIPELINING:
+ return sendSideMatPipeliningPolicy;
+ case SEND_SIDE_MAT_BLOCKING:
+ return sendSideMatBlockingPolicy;
+ case SEND_SIDE_MAT_RECEIVE_SIDE_MAT_BLOCKING:
+ return sendSideMatReceiveSideMatBlockingPolicy;
+ default:
+ return pipeliningPolicy;
+ }
+ } else {
+ // pipelining for other connectors
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java
new file mode 100644
index 0000000..ccc2e6c
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSink.java
@@ -0,0 +1,32 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+
+public class HiveDataSink implements IDataSink {
+
+ private Object[] schema;
+
+ private Object fsOperator;
+
+ public HiveDataSink(Object sink, Object[] sourceSchema) {
+ schema = sourceSchema;
+ fsOperator = sink;
+ }
+
+ @Override
+ public Object getId() {
+ return fsOperator;
+ }
+
+ @Override
+ public Object[] getSchemaTypes() {
+ return schema;
+ }
+
+ public IPartitioningProperty getPartitioningProperty() {
+ return new RandomPartitioningProperty(new HiveDomain());
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java
new file mode 100644
index 0000000..67b743b
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSource.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+
+public class HiveDataSource<P> implements IDataSource<P> {
+
+ private P source;
+
+ private Object[] schema;
+
+ public HiveDataSource(P dataSource, Object[] sourceSchema) {
+ source = dataSource;
+ schema = sourceSchema;
+ }
+
+ @Override
+ public P getId() {
+ return source;
+ }
+
+ @Override
+ public Object[] getSchemaTypes() {
+ return schema;
+ }
+
+ @Override
+ public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
+ }
+
+ @Override
+ public IDataSourcePropertiesProvider getPropertiesProvider() {
+ return new HiveDataSourcePartitioningProvider();
+ }
+
+ @Override
+ public String toString() {
+ PartitionDesc desc = (PartitionDesc) source;
+ return desc.getTableName();
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java
new file mode 100644
index 0000000..bb9c4ce
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDataSourcePartitioningProvider.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public class HiveDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
+
+ @Override
+ public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
+ IPartitioningProperty property = new RandomPartitioningProperty(new HiveDomain());
+ IPhysicalPropertiesVector vector = new StructuralPropertiesVector(property,
+ new LinkedList<ILocalStructuralProperty>());
+ return vector;
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java
new file mode 100644
index 0000000..8b1d3b5
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveDomain.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class HiveDomain implements INodeDomain {
+
+ @Override
+ public boolean sameAs(INodeDomain domain) {
+ return true;
+ }
+
+ @Override
+ public Integer cardinality() {
+ return 0;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
new file mode 100644
index 0000000..13526d8
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -0,0 +1,130 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+
+import edu.uci.ics.hivesterix.logical.expression.HiveFunctionInfo;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveScanRuntimeGenerator;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveWriteRuntimeGenerator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings("rawtypes")
+public class HiveMetaDataProvider<S, T> implements IMetadataProvider<S, T> {
+
+ private Operator fileSink;
+ private Schema outputSchema;
+ private HashMap<S, IDataSource<S>> dataSourceMap;
+
+ public HiveMetaDataProvider(Operator fsOp, Schema oi, HashMap<S, IDataSource<S>> map) {
+ fileSink = fsOp;
+ outputSchema = oi;
+ dataSourceMap = map;
+ }
+
+ @Override
+ public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public IDataSource<S> findDataSource(S id) throws AlgebricksException {
+ return dataSourceMap.get(id);
+ }
+
+ @Override
+ public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {
+ return true;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
+
+ S desc = dataSource.getId();
+ HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator((PartitionDesc) desc);
+ return generator.getRuntimeOperatorAndConstraint(dataSource, scanVariables, projectVariables, projectPushed,
+ context, jobSpec);
+ }
+
+ @Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+
+ HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator((FileSinkOperator) fileSink, outputSchema);
+ return generator.getWriterRuntime(inputDesc);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,
+ JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,
+ JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,
+ JobSpecification arg5) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {
+ return new HiveFunctionInfo(arg0, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+ IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java
new file mode 100644
index 0000000..cdb0e95
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveOperatorSchema.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+
+public class HiveOperatorSchema implements IOperatorSchema {
+
+ private final Map<LogicalVariable, Integer> varMap;
+
+ private final List<LogicalVariable> varList;
+
+ public HiveOperatorSchema() {
+ varMap = new HashMap<LogicalVariable, Integer>();
+ varList = new ArrayList<LogicalVariable>();
+ }
+
+ @Override
+ public void addAllVariables(IOperatorSchema source) {
+ for (LogicalVariable v : source) {
+ varMap.put(v, varList.size());
+ varList.add(v);
+ }
+ }
+
+ @Override
+ public void addAllNewVariables(IOperatorSchema source) {
+ for (LogicalVariable v : source) {
+ if (varMap.get(v) == null) {
+ varMap.put(v, varList.size());
+ varList.add(v);
+ }
+ }
+ }
+
+ @Override
+ public int addVariable(LogicalVariable var) {
+ int idx = varList.size();
+ varMap.put(var, idx);
+ varList.add(var);
+ return idx;
+ }
+
+ @Override
+ public void clear() {
+ varMap.clear();
+ varList.clear();
+ }
+
+ @Override
+ public int findVariable(LogicalVariable var) {
+ Integer i = varMap.get(var);
+ if (i == null) {
+ return -1;
+ }
+ return i;
+ }
+
+ @Override
+ public int getSize() {
+ return varList.size();
+ }
+
+ @Override
+ public LogicalVariable getVariable(int index) {
+ return varList.get(index);
+ }
+
+ @Override
+ public Iterator<LogicalVariable> iterator() {
+ return varList.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return varMap.toString();
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
new file mode 100644
index 0000000..f3af915
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileScanOperatorDescriptor;
+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileSplitProvider;
+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveTupleParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+@SuppressWarnings({ "rawtypes", "deprecation" })
+public class HiveScanRuntimeGenerator {
+
+ private PartitionDesc fileDesc;
+
+ private transient Path filePath;
+
+ private String filePathName;
+
+ private Properties properties;
+
+ public HiveScanRuntimeGenerator(PartitionDesc path) {
+ fileDesc = path;
+ properties = fileDesc.getProperties();
+
+ String inputPath = (String) properties.getProperty("location");
+
+ if (inputPath.startsWith("file:")) {
+ // Windows
+ String[] strs = inputPath.split(":");
+ filePathName = strs[strs.length - 1];
+ } else {
+ // Linux
+ filePathName = inputPath;
+ }
+
+ filePath = new Path(filePathName);
+ }
+
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRuntimeOperatorAndConstraint(
+ IDataSource dataSource, List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables,
+ boolean projectPushed, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+ try {
+ // get the correct delimiter from Hive metastore or other data
+ // structures
+ IOperatorSchema propagatedSchema = new HiveOperatorSchema();
+
+ List<LogicalVariable> outputVariables = projectPushed ? projectVariables : scanVariables;
+ for (LogicalVariable var : outputVariables)
+ propagatedSchema.addVariable(var);
+
+ int[] outputColumnsOffset = new int[scanVariables.size()];
+ int i = 0;
+ for (LogicalVariable var : scanVariables)
+ if (outputVariables.contains(var)) {
+ int offset = outputVariables.indexOf(var);
+ outputColumnsOffset[i++] = offset;
+ } else
+ outputColumnsOffset[i++] = -1;
+
+ Object[] schemaTypes = dataSource.getSchemaTypes();
+ // get record descriptor
+ RecordDescriptor recDescriptor = mkRecordDescriptor(propagatedSchema, schemaTypes, context);
+
+ // setup the run time operator
+ JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);
+ int clusterSize = ConfUtil.getNCs().length;
+ IFileSplitProvider fsprovider = new HiveFileSplitProvider(conf, filePathName, clusterSize);
+ ITupleParserFactory tupleParserFactory = new HiveTupleParserFactory(fileDesc, conf, outputColumnsOffset);
+ HiveFileScanOperatorDescriptor opDesc = new HiveFileScanOperatorDescriptor(jobSpec, fsprovider,
+ tupleParserFactory, recDescriptor);
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(opDesc, opDesc.getPartitionConstraint());
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private static RecordDescriptor mkRecordDescriptor(IOperatorSchema opSchema, Object[] types, JobGenContext context)
+ throws AlgebricksException {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+ ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
+ int size = opSchema.getSize();
+ for (int i = 0; i < size; i++) {
+ Object t = types[i];
+ fields[i] = sdp.getSerializerDeserializer(t);
+ i++;
+ }
+ return new RecordDescriptor(fields);
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java
new file mode 100644
index 0000000..7a577e8
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveWriteRuntimeGenerator.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hivesterix.runtime.operator.filewrite.HivePushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+@SuppressWarnings("deprecation")
+public class HiveWriteRuntimeGenerator {
+ private FileSinkOperator fileSink;
+
+ private Schema inputSchema;
+
+ public HiveWriteRuntimeGenerator(FileSinkOperator fsOp, Schema oi) {
+ fileSink = fsOp;
+ inputSchema = oi;
+ }
+
+ /**
+ * get the write runtime
+ *
+ * @param inputDesc
+ * @return
+ */
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriterRuntime(RecordDescriptor inputDesc) {
+ JobConf conf = ConfUtil.getJobConf();
+ IPushRuntimeFactory factory = new HivePushRuntimeFactory(inputDesc, conf, fileSink, inputSchema);
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> pair = new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(
+ factory, null);
+ return pair;
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java
new file mode 100644
index 0000000..927c709
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/Schema.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.jobgen;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazyUtils;
+
+public class Schema implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private List<String> fieldNames;
+
+ private List<TypeInfo> fieldTypes;
+
+ public Schema(List<String> fieldNames, List<TypeInfo> fieldTypes) {
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ }
+
+ public ObjectInspector toObjectInspector() {
+ return LazyUtils.getLazyObjectInspector(fieldNames, fieldTypes);
+ }
+
+ public List<String> getNames() {
+ return fieldNames;
+ }
+
+ public List<TypeInfo> getTypes() {
+ return fieldTypes;
+ }
+
+ public Object[] getSchema() {
+ return fieldTypes.toArray();
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
new file mode 100644
index 0000000..03f3312
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public abstract class AbstractHiveFileSplitProvider implements IFileSplitProvider {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FileSplit[] getFileSplits() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @SuppressWarnings("deprecation")
+ public abstract org.apache.hadoop.mapred.FileSplit[] getFileSplitArray();
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
new file mode 100644
index 0000000..485e1d0
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.mapred.FileSplit;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+@SuppressWarnings("deprecation")
+public abstract class AbstractHiveTupleParser implements ITupleParser {
+
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ // empty implementation
+ }
+
+ /**
+ * method for parsing HDFS file split
+ *
+ * @param split
+ * @param writer
+ */
+ abstract public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException;
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
new file mode 100644
index 0000000..a64d398
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.mapred.FileSplit;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+@SuppressWarnings("deprecation")
+public class HiveFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * tuple parser factory
+ */
+ private final ITupleParserFactory tupleParserFactory;
+
+ /**
+ * Hive file split
+ */
+ private Partition[] parts;
+
+ /**
+ * IFileSplitProvider
+ */
+ private IFileSplitProvider fileSplitProvider;
+
+ /**
+ * constrains in the form of host DNS names
+ */
+ private String[] constraintsByHostNames;
+
+ /**
+ * ip-to-node controller mapping
+ */
+ private Map<String, List<String>> ncMapping;
+
+ /**
+ * an array of NCs
+ */
+ private String[] NCs;
+
+ /**
+ * @param spec
+ * @param fsProvider
+ */
+ public HiveFileScanOperatorDescriptor(JobSpecification spec, IFileSplitProvider fsProvider,
+ ITupleParserFactory tupleParserFactory, RecordDescriptor rDesc) {
+ super(spec, 0, 1);
+ this.tupleParserFactory = tupleParserFactory;
+ recordDescriptors[0] = rDesc;
+ fileSplitProvider = fsProvider;
+ }
+
+ /**
+ * set partition constraint at the first time it is called the number of
+ * partitions is obtained from HDFS name node
+ */
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+ try {
+ FileSplit[] returnedSplits = ((AbstractHiveFileSplitProvider) fileSplitProvider).getFileSplitArray();
+ Random random = new Random(System.currentTimeMillis());
+ ncMapping = ConfUtil.getNCMapping();
+ NCs = ConfUtil.getNCs();
+
+ int size = 0;
+ for (FileSplit split : returnedSplits)
+ if (split != null)
+ size++;
+
+ FileSplit[] splits = new FileSplit[size];
+ for (int i = 0; i < returnedSplits.length; i++)
+ if (returnedSplits[i] != null)
+ splits[i] = returnedSplits[i];
+
+ System.out.println("number of splits: " + splits.length);
+ constraintsByHostNames = new String[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ try {
+ String[] loc = splits[i].getLocations();
+ Collections.shuffle(Arrays.asList(loc), random);
+ if (loc.length > 0) {
+ InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
+ for (InetAddress ip : allIps) {
+ if (ncMapping.get(ip.getHostAddress()) != null) {
+ List<String> ncs = ncMapping.get(ip.getHostAddress());
+ int pos = random.nextInt(ncs.size());
+ constraintsByHostNames[i] = ncs.get(pos);
+ } else {
+ int pos = random.nextInt(NCs.length);
+ constraintsByHostNames[i] = NCs[pos];
+ }
+ }
+ } else {
+ int pos = random.nextInt(NCs.length);
+ constraintsByHostNames[i] = NCs[pos];
+ if (splits[i].getLength() > 0)
+ throw new IllegalStateException("non local scanner non locations!!");
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ parts = new Partition[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ parts[i] = new Partition(splits[i]);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(constraintsByHostNames);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+
+ final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
+ final int partitionId = partition;
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ FileSplit split = parts[partitionId].toFileSplit();
+ if (split == null)
+ throw new HyracksDataException("partition " + partitionId + " is null!");
+ ((AbstractHiveTupleParser) tp).parse(split, writer);
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
new file mode 100644
index 0000000..af52f27
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
@@ -0,0 +1,108 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.UUID;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.eclipse.jetty.util.log.Log;
+
+@SuppressWarnings({ "deprecation", "rawtypes" })
+public class HiveFileSplitProvider extends AbstractHiveFileSplitProvider {
+ private static final long serialVersionUID = 1L;
+
+ private transient InputFormat format;
+ private transient JobConf conf;
+ private String confContent;
+ final private int nPartition;
+ private transient FileSplit[] splits;
+
+ public HiveFileSplitProvider(JobConf conf, String filePath, int nPartition) {
+ format = conf.getInputFormat();
+ this.conf = conf;
+ this.nPartition = nPartition;
+ writeConfContent();
+ }
+
+ private void writeConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
+ conf.writeXml(out);
+ out.close();
+
+ DataInputStream in = new DataInputStream(new FileInputStream(fileName));
+ StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = in.readLine()) != null) {
+ buffer.append(line + "\n");
+ }
+ in.close();
+ confContent = buffer.toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void readConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
+ out.write(confContent);
+ out.close();
+ conf = new JobConf(fileName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ /**
+ * get the HDFS file split
+ */
+ public FileSplit[] getFileSplitArray() {
+ readConfContent();
+ conf.setClassLoader(this.getClass().getClassLoader());
+ format = conf.getInputFormat();
+ // int splitSize = conf.getInt("mapred.min.split.size", 0);
+
+ if (splits == null) {
+ try {
+ splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);
+ System.out.println("hdfs split number: " + splits.length);
+ } catch (IOException e) {
+ String inputPath = conf.get("mapred.input.dir");
+ String hdfsURL = conf.get("fs.default.name");
+ String alternatePath = inputPath.replaceAll(hdfsURL, "file:");
+ conf.set("mapred.input.dir", alternatePath);
+ try {
+ splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);
+ System.out.println("hdfs split number: " + splits.length);
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ Log.debug(e1.getMessage());
+ return null;
+ }
+ }
+ }
+ return splits;
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
new file mode 100644
index 0000000..718c311
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
@@ -0,0 +1,215 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
+import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
+public class HiveTupleParser extends AbstractHiveTupleParser {
+
+ private int[] outputColumnsOffset;
+ /**
+ * class of input format
+ */
+ private InputFormat inputFormat;
+
+ /**
+ * serialization/deserialization object
+ */
+ private SerDe serDe;
+
+ /**
+ * the input row object inspector
+ */
+ private ObjectInspector objectInspector;
+
+ /**
+ * the hadoop job conf
+ */
+ private JobConf job;
+
+ /**
+ * Hyrax context to control resource allocation
+ */
+ private final IHyracksTaskContext ctx;
+
+ /**
+ * lazy serde: format flow in between operators
+ */
+ private final SerDe outputSerDe;
+
+ /**
+ * the parser from hive data to binary data
+ */
+ private IHiveParser parser = null;
+
+ /**
+ * parser for any hive input format
+ *
+ * @param inputFormatClass
+ * @param serDeClass
+ * @param tbl
+ * @param conf
+ * @throws AlgebricksException
+ */
+ public HiveTupleParser(String inputFormatClass, String serDeClass, String outputSerDeClass, Properties tbl,
+ JobConf conf, final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws AlgebricksException {
+ try {
+ conf.setClassLoader(this.getClass().getClassLoader());
+
+ inputFormat = (InputFormat) ReflectionUtils.newInstance(Class.forName(inputFormatClass), conf);
+ job = conf;
+
+ // initialize the input serde
+ serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);
+ serDe.initialize(job, tbl);
+
+ // initialize the output serde
+ outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);
+ outputSerDe.initialize(job, tbl);
+
+ // object inspector of the row
+ objectInspector = serDe.getObjectInspector();
+
+ // hyracks context
+ this.ctx = ctx;
+ this.outputColumnsOffset = outputColumnsOffset;
+
+ if (objectInspector instanceof LazySimpleStructObjectInspector) {
+ LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) objectInspector;
+ List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();
+ boolean lightWeightParsable = true;
+ for (StructField fieldRef : fieldRefs) {
+ Category category = fieldRef.getFieldObjectInspector().getCategory();
+ if (!(category == Category.PRIMITIVE)) {
+ lightWeightParsable = false;
+ break;
+ }
+ }
+ if (lightWeightParsable)
+ parser = new TextToBinaryTupleParser(this.outputColumnsOffset, this.objectInspector);
+ }
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ /**
+ * parse a input HDFS file split, the result is send to the writer
+ * one-frame-a-time
+ *
+ * @param split
+ * the HDFS file split
+ * @param writer
+ * the writer
+ * @throws HyracksDataException
+ * if there is sth. wrong in the ser/de
+ */
+ @Override
+ public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException {
+ try {
+ StructObjectInspector structInspector = (StructObjectInspector) objectInspector;
+
+ // create the reader, key, and value
+ RecordReader reader = inputFormat.getRecordReader(split, job, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+
+ // allocate a new frame
+ ByteBuffer frame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(frame, true);
+
+ List<? extends StructField> fieldRefs = structInspector.getAllStructFieldRefs();
+ int size = 0;
+ for (int i = 0; i < outputColumnsOffset.length; i++)
+ if (outputColumnsOffset[i] >= 0)
+ size++;
+
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(size);
+ DataOutput dos = tb.getDataOutput();
+ StructField[] outputFieldRefs = new StructField[size];
+ Object[] outputFields = new Object[size];
+ for (int i = 0; i < outputColumnsOffset.length; i++)
+ if (outputColumnsOffset[i] >= 0)
+ outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
+
+ while (reader.next(key, value)) {
+ // reuse the tuple builder
+ tb.reset();
+ if (parser != null) {
+ Text text = (Text) value;
+ parser.parse(text.getBytes(), 0, text.getLength(), tb);
+ } else {
+ Object row = serDe.deserialize((Writable) value);
+ // write fields to the tuple builder one by one
+ int i = 0;
+ for (StructField fieldRef : fieldRefs) {
+ if (outputColumnsOffset[i] >= 0)
+ outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);
+ i++;
+ }
+
+ i = 0;
+ for (Object field : outputFields) {
+ BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,
+ outputFieldRefs[i].getFieldObjectInspector());
+ dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());
+ tb.addFieldEndOffset();
+ i++;
+ }
+ }
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (appender.getTupleCount() <= 0)
+ throw new IllegalStateException("zero tuples in a frame!");
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ reader.close();
+ System.gc();
+
+ // flush the last frame
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (SerDeException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
new file mode 100644
index 0000000..98d730f
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+@SuppressWarnings("deprecation")
+public class HiveTupleParserFactory implements ITupleParserFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outputColumns;
+
+ private String outputSerDeClass = LazySerDe.class.getName();
+
+ private String inputSerDeClass;
+
+ private transient JobConf conf;
+
+ private Properties tbl;
+
+ private String confContent;
+
+ private String inputFormatClass;
+
+ public HiveTupleParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumns) {
+ this.conf = conf;
+ tbl = desc.getProperties();
+ inputFormatClass = (String) tbl.getProperty("file.inputformat");
+ inputSerDeClass = (String) tbl.getProperty("serialization.lib");
+ this.outputColumns = outputColumns;
+
+ writeConfContent();
+ }
+
+ @Override
+ public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
+ readConfContent();
+ try {
+ return new HiveTupleParser(inputFormatClass, inputSerDeClass, outputSerDeClass, tbl, conf, ctx,
+ outputColumns);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ private void writeConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
+ conf.writeXml(out);
+ out.close();
+
+ DataInputStream in = new DataInputStream(new FileInputStream(fileName));
+ StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = in.readLine()) != null) {
+ buffer.append(line + "\n");
+ }
+ in.close();
+ confContent = buffer.toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void readConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
+ out.write(confContent);
+ out.close();
+
+ conf = new JobConf(fileName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
new file mode 100644
index 0000000..6742a34
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+@SuppressWarnings("deprecation")
+public class Partition implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String uri;
+ private long offset;
+ private long length;
+ private String[] locations;
+
+ public Partition() {
+ }
+
+ public Partition(FileSplit file) {
+ uri = file.getPath().toUri().toString();
+ offset = file.getStart();
+ length = file.getLength();
+ try {
+ locations = file.getLocations();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public FileSplit toFileSplit() {
+ return new FileSplit(new Path(uri), offset, length, locations);
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
new file mode 100644
index 0000000..3a62b2e
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
@@ -0,0 +1,147 @@
+package edu.uci.ics.hivesterix.runtime.operator.filewrite;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hivesterix.serde.lazy.LazyColumnar;
+import edu.uci.ics.hivesterix.serde.lazy.objectinspector.LazyColumnarObjectInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+@SuppressWarnings("deprecation")
+public class HiveFileWritePushRuntime implements IPushRuntime {
+
+ /**
+ * frame tuple accessor to access byte buffer
+ */
+ private final FrameTupleAccessor accessor;
+
+ /**
+ * input object inspector
+ */
+ private final ObjectInspector inputInspector;
+
+ /**
+ * cachedInput
+ */
+ private final LazyColumnar cachedInput;
+
+ /**
+ * File sink operator of Hive
+ */
+ private final FileSinkDesc fileSink;
+
+ /**
+ * job configuration, which contain name node and other configuration
+ * information
+ */
+ private JobConf conf;
+
+ /**
+ * input object inspector
+ */
+ private final Schema inputSchema;
+
+ /**
+ * a copy of hive schema representation
+ */
+ private RowSchema rowSchema;
+
+ /**
+ * the Hive file sink operator
+ */
+ private FileSinkOperator fsOp;
+
+ /**
+ * cached tuple object reference
+ */
+ private FrameTupleReference tuple = new FrameTupleReference();
+
+ /**
+ * @param spec
+ * @param fsProvider
+ */
+ public HiveFileWritePushRuntime(IHyracksTaskContext context, RecordDescriptor inputRecordDesc, JobConf job,
+ FileSinkDesc fs, RowSchema schema, Schema oi) {
+ fileSink = fs;
+ fileSink.setGatherStats(false);
+
+ rowSchema = schema;
+ conf = job;
+ inputSchema = oi;
+
+ accessor = new FrameTupleAccessor(context.getFrameSize(), inputRecordDesc);
+ inputInspector = inputSchema.toObjectInspector();
+ cachedInput = new LazyColumnar((LazyColumnarObjectInspector) inputInspector);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);
+ fsOp.setChildOperators(null);
+ fsOp.setParentOperators(null);
+ conf.setClassLoader(this.getClass().getClassLoader());
+
+ ObjectInspector[] inspectors = new ObjectInspector[1];
+ inspectors[0] = inputInspector;
+ try {
+ fsOp.initialize(conf, inspectors);
+ fsOp.setExecContext(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int n = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < n; ++i) {
+ tuple.reset(accessor, i);
+ cachedInput.init(tuple);
+ fsOp.process(cachedInput, 0);
+ }
+ } catch (HiveException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ fsOp.closeOp(false);
+ } catch (HiveException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java
new file mode 100644
index 0000000..6c18231
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HivePushRuntimeFactory.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hivesterix.runtime.operator.filewrite;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.UUID;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.runtime.jobgen.Schema;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+@SuppressWarnings("deprecation")
+public class HivePushRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RecordDescriptor inputRecordDesc;
+ private transient JobConf conf;
+ private final FileSinkDesc fileSink;
+ private final RowSchema outSchema;
+ private final Schema schema;
+
+ /**
+ * the content of the configuration
+ */
+ private String confContent;
+
+ public HivePushRuntimeFactory(RecordDescriptor inputRecordDesc, JobConf conf, FileSinkOperator fsp, Schema sch) {
+ this.inputRecordDesc = inputRecordDesc;
+ this.conf = conf;
+ this.fileSink = fsp.getConf();
+ outSchema = fsp.getSchema();
+ this.schema = sch;
+
+ writeConfContent();
+ }
+
+ @Override
+ public String toString() {
+ return "file write";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext context) throws AlgebricksException {
+ if (conf == null)
+ readConfContent();
+
+ return new HiveFileWritePushRuntime(context, inputRecordDesc, conf, fileSink, outSchema, schema);
+ }
+
+ private void readConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));
+ out.write(confContent);
+ out.close();
+ conf = new JobConf(fileName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void writeConfContent() {
+ File dir = new File("hadoop-conf-tmp");
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";
+ try {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));
+ conf.writeXml(out);
+ out.close();
+
+ DataInputStream in = new DataInputStream(new FileInputStream(fileName));
+ StringBuffer buffer = new StringBuffer();
+ String line;
+ while ((line = in.readLine()) != null) {
+ buffer.append(line + "\n");
+ }
+ in.close();
+ confContent = buffer.toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java
new file mode 100644
index 0000000..467ec0a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryComparatorFactoryProvider.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveByteBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveByteBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveDoubleBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveDoubleBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveFloatBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveFloatBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveIntegerBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveIntegerBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveLongBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveLongBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveShortBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveShortBinaryDescComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveStringBinaryAscComparatorFactory;
+import edu.uci.ics.hivesterix.runtime.factory.comparator.HiveStringBinaryDescComparatorFactory;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class HiveBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider {
+
+ public static final HiveBinaryComparatorFactoryProvider INSTANCE = new HiveBinaryComparatorFactoryProvider();
+
+ private HiveBinaryComparatorFactoryProvider() {
+ }
+
+ @Override
+ public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending)
+ throws AlgebricksException {
+ if (type.equals(TypeInfoFactory.intTypeInfo)) {
+ if (ascending)
+ return HiveIntegerBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveIntegerBinaryDescComparatorFactory.INSTANCE;
+
+ } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+ if (ascending)
+ return HiveLongBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveLongBinaryDescComparatorFactory.INSTANCE;
+
+ } else if (type.equals(TypeInfoFactory.floatTypeInfo)) {
+ if (ascending)
+ return HiveFloatBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveFloatBinaryDescComparatorFactory.INSTANCE;
+
+ } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+ if (ascending)
+ return HiveDoubleBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveDoubleBinaryDescComparatorFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.shortTypeInfo)) {
+ if (ascending)
+ return HiveShortBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveShortBinaryDescComparatorFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+ if (ascending)
+ return HiveStringBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveStringBinaryDescComparatorFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.byteTypeInfo) || type.equals(TypeInfoFactory.booleanTypeInfo)) {
+ if (ascending)
+ return HiveByteBinaryAscComparatorFactory.INSTANCE;
+ else
+ return HiveByteBinaryDescComparatorFactory.INSTANCE;
+ } else
+ throw new NotImplementedException();
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java
new file mode 100644
index 0000000..473eee1
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFactoryProvider.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveDoubleBinaryHashFunctionFactory;
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveIntegerBinaryHashFunctionFactory;
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveLongBinaryHashFunctionFactory;
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveRawBinaryHashFunctionFactory;
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.HiveStingBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class HiveBinaryHashFunctionFactoryProvider implements IBinaryHashFunctionFactoryProvider {
+
+ public static final HiveBinaryHashFunctionFactoryProvider INSTANCE = new HiveBinaryHashFunctionFactoryProvider();
+
+ private HiveBinaryHashFunctionFactoryProvider() {
+ }
+
+ @Override
+ public IBinaryHashFunctionFactory getBinaryHashFunctionFactory(Object type) throws AlgebricksException {
+ if (type.equals(TypeInfoFactory.intTypeInfo)) {
+ return HiveIntegerBinaryHashFunctionFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+ return HiveLongBinaryHashFunctionFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+ return HiveStingBinaryHashFunctionFactory.INSTANCE;
+ } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return HiveDoubleBinaryHashFunctionFactory.INSTANCE;
+ } else {
+ return HiveRawBinaryHashFunctionFactory.INSTANCE;
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..e7a2e79
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hivesterix.runtime.factory.hashfunction.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class HiveBinaryHashFunctionFamilyProvider implements IBinaryHashFunctionFamilyProvider {
+
+ public static HiveBinaryHashFunctionFamilyProvider INSTANCE = new HiveBinaryHashFunctionFamilyProvider();
+
+ private HiveBinaryHashFunctionFamilyProvider() {
+
+ }
+
+ @Override
+ public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type) throws AlgebricksException {
+ return MurmurHash3BinaryHashFunctionFamily.INSTANCE;
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..91bf3e5
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveNormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveDoubleAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveDoubleDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveIntegerAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveIntegerDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveLongAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveLongDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveStringAscNormalizedKeyComputerFactory;
+import edu.uci.ics.hivesterix.runtime.factory.normalize.HiveStringDescNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class HiveNormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
+
+ public static final HiveNormalizedKeyComputerFactoryProvider INSTANCE = new HiveNormalizedKeyComputerFactoryProvider();
+
+ private HiveNormalizedKeyComputerFactoryProvider() {
+ }
+
+ @Override
+ public INormalizedKeyComputerFactory getNormalizedKeyComputerFactory(Object type, boolean ascending) {
+ if (ascending) {
+ if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new HiveStringAscNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.intTypeInfo)) {
+ return new HiveIntegerAscNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+ return new HiveLongAscNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new HiveDoubleAscNormalizedKeyComputerFactory();
+ } else {
+ return null;
+ }
+ } else {
+ if (type.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new HiveStringDescNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.intTypeInfo)) {
+ return new HiveIntegerDescNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.longTypeInfo)) {
+ return new HiveLongDescNormalizedKeyComputerFactory();
+ } else if (type.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new HiveDoubleDescNormalizedKeyComputerFactory();
+ } else {
+ return null;
+ }
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java
new file mode 100644
index 0000000..10c84d2
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HivePrinterFactoryProvider.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
+
+public class HivePrinterFactoryProvider implements IPrinterFactoryProvider {
+
+ public static IPrinterFactoryProvider INSTANCE = new HivePrinterFactoryProvider();
+
+ @Override
+ public IPrinterFactory getPrinterFactory(Object type) throws AlgebricksException {
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java
new file mode 100644
index 0000000..22f81e0
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveSerializerDeserializerProvider.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+
+public class HiveSerializerDeserializerProvider implements ISerializerDeserializerProvider {
+
+ public static final HiveSerializerDeserializerProvider INSTANCE = new HiveSerializerDeserializerProvider();
+
+ private HiveSerializerDeserializerProvider() {
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public ISerializerDeserializer getSerializerDeserializer(Object type) throws AlgebricksException {
+ // return ARecordSerializerDeserializer.SCHEMALESS_INSTANCE;
+ return null;
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java
new file mode 100644
index 0000000..be4b149
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/provider/HiveTypeTraitProvider.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hivesterix.runtime.provider;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+
+public class HiveTypeTraitProvider implements ITypeTraitProvider, Serializable {
+ private static final long serialVersionUID = 1L;
+ public static HiveTypeTraitProvider INSTANCE = new HiveTypeTraitProvider();
+
+ private HiveTypeTraitProvider() {
+
+ }
+
+ @Override
+ public ITypeTraits getTypeTrait(Object arg0) {
+ return new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ };
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
new file mode 100644
index 0000000..0f445f4
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * GenericUDAFAverage.
+ */
+@Description(name = "avg", value = "_FUNC_(x) - Returns the mean of a set of numbers")
+public class GenericUDAFAverage extends AbstractGenericUDAFResolver {
+
+ static final Log LOG = LogFactory.getLog(GenericUDAFAverage.class.getName());
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ if (parameters.length != 1) {
+ throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
+ }
+
+ if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return new GenericUDAFAverageEvaluator();
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(0, "Only numeric or string type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ }
+
+ /**
+ * GenericUDAFAverageEvaluator.
+ */
+ public static class GenericUDAFAverageEvaluator extends GenericUDAFEvaluator {
+
+ // For PARTIAL1 and COMPLETE
+ PrimitiveObjectInspector inputOI;
+
+ // For PARTIAL2 and FINAL
+ StructObjectInspector soi;
+ StructField countField;
+ StructField sumField;
+ LongObjectInspector countFieldOI;
+ DoubleObjectInspector sumFieldOI;
+
+ // For PARTIAL1 and PARTIAL2
+ Object[] partialResult;
+
+ // For FINAL and COMPLETE
+ DoubleWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ super.init(m, parameters);
+
+ // init input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ inputOI = (PrimitiveObjectInspector) parameters[0];
+ } else {
+ soi = (StructObjectInspector) parameters[0];
+ countField = soi.getStructFieldRef("count");
+ sumField = soi.getStructFieldRef("sum");
+ countFieldOI = (LongObjectInspector) countField.getFieldObjectInspector();
+ sumFieldOI = (DoubleObjectInspector) sumField.getFieldObjectInspector();
+ }
+
+ // init output
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
+ // The output of a partial aggregation is a struct containing
+ // a "long" count and a "double" sum.
+
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+ foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ ArrayList<String> fname = new ArrayList<String>();
+ fname.add("count");
+ fname.add("sum");
+ partialResult = new Object[2];
+ partialResult[0] = new LongWritable(0);
+ partialResult[1] = new DoubleWritable(0);
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
+
+ } else {
+ result = new DoubleWritable(0);
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ }
+ }
+
+ static class AverageAgg implements SerializableBuffer {
+ long count;
+ double sum;
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ count = BufferSerDeUtil.getLong(data, start);
+ start += 8;
+ sum = BufferSerDeUtil.getDouble(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeLong(count, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(sum, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeLong(count);
+ output.writeDouble(sum);
+ }
+ };
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ AverageAgg result = new AverageAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ AverageAgg myagg = (AverageAgg) agg;
+ myagg.count = 0;
+ myagg.sum = 0;
+ }
+
+ boolean warned = false;
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ Object p = parameters[0];
+ if (p != null) {
+ AverageAgg myagg = (AverageAgg) agg;
+ try {
+ double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
+ myagg.count++;
+ myagg.sum += v;
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName() + " ignoring similar exceptions.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ AverageAgg myagg = (AverageAgg) agg;
+ ((LongWritable) partialResult[0]).set(myagg.count);
+ ((DoubleWritable) partialResult[1]).set(myagg.sum);
+ return partialResult;
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ AverageAgg myagg = (AverageAgg) agg;
+ Object partialCount = soi.getStructFieldData(partial, countField);
+ Object partialSum = soi.getStructFieldData(partial, sumField);
+ myagg.count += countFieldOI.get(partialCount);
+ myagg.sum += sumFieldOI.get(partialSum);
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ AverageAgg myagg = (AverageAgg) agg;
+ if (myagg.count == 0) {
+ return null;
+ } else {
+ result.set(myagg.sum / myagg.count);
+ return result;
+ }
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
new file mode 100644
index 0000000..2c4022e
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCorrelation.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * Compute the Pearson correlation coefficient corr(x, y), using the following
+ * stable one-pass method, based on: "Formulas for Robust, One-Pass Parallel
+ * Computation of Covariances and Arbitrary-Order Statistical Moments", Philippe
+ * Pebay, Sandia Labs and
+ * "The Art of Computer Programming, volume 2: Seminumerical Algorithms", Donald
+ * Knuth.
+ * Incremental: n : <count> mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg> my_n =
+ * my_(n-1) + [y_n - my_(n-1)]/n : <yavg> c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n
+ * - my_n) : <covariance * n> vx_n = vx_(n-1) + (x_n - mx_n)(x_n - mx_(n-1)):
+ * <variance * n> vy_n = vy_(n-1) + (y_n - my_n)(y_n - my_(n-1)): <variance * n>
+ * Merge: c_(A,B) = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ * vx_(A,B) = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B)
+ * vy_(A,B) = vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ */
+@Description(name = "corr", value = "_FUNC_(x,y) - Returns the Pearson coefficient of correlation\n"
+ + "between a set of number pairs", extended = "The function takes as arguments any pair of numeric types and returns a double.\n"
+ + "Any pair with a NULL is ignored. If the function is applied to an empty set or\n"
+ + "a singleton set, NULL will be returned. Otherwise, it computes the following:\n"
+ + " COVAR_POP(x,y)/(STDDEV_POP(x)*STDDEV_POP(y))\n"
+ + "where neither x nor y is null,\n"
+ + "COVAR_POP is the population covariance,\n" + "and STDDEV_POP is the population standard deviation.")
+public class GenericUDAFCorrelation extends AbstractGenericUDAFResolver {
+
+ static final Log LOG = LogFactory.getLog(GenericUDAFCorrelation.class.getName());
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ if (parameters.length != 2) {
+ throw new UDFArgumentTypeException(parameters.length - 1, "Exactly two arguments are expected.");
+ }
+
+ if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+
+ if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but "
+ + parameters[1].getTypeName() + " is passed.");
+ }
+
+ switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ switch (((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new GenericUDAFCorrelationEvaluator();
+ case STRING:
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(1, "Only numeric type arguments are accepted but "
+ + parameters[1].getTypeName() + " is passed.");
+ }
+ case STRING:
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(0, "Only numeric type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ }
+
+ /**
+ * Evaluate the Pearson correlation coefficient using a stable one-pass
+ * algorithm, based on work by Philippe Pébay and Donald Knuth.
+ * Incremental: n : <count> mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg> c_n = c_(n-1) + (x_n -
+ * mx_(n-1))*(y_n - my_n) : <covariance * n> vx_n = vx_(n-1) + (x_n -
+ * mx_n)(x_n - mx_(n-1)): <variance * n> vy_n = vy_(n-1) + (y_n - my_n)(y_n
+ * - my_(n-1)): <variance * n>
+ * Merge: c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X vx_(A,B)
+ * = vx_A + vx_B + (mx_A - mx_B)*(mx_A - mx_B)*n_A*n_B/(n_A+n_B) vy_(A,B) =
+ * vy_A + vy_B + (my_A - my_B)*(my_A - my_B)*n_A*n_B/(n_A+n_B)
+ */
+ public static class GenericUDAFCorrelationEvaluator extends GenericUDAFEvaluator {
+
+ // For PARTIAL1 and COMPLETE
+ private PrimitiveObjectInspector xInputOI;
+ private PrimitiveObjectInspector yInputOI;
+
+ // For PARTIAL2 and FINAL
+ private StructObjectInspector soi;
+ private StructField countField;
+ private StructField xavgField;
+ private StructField yavgField;
+ private StructField xvarField;
+ private StructField yvarField;
+ private StructField covarField;
+ private LongObjectInspector countFieldOI;
+ private DoubleObjectInspector xavgFieldOI;
+ private DoubleObjectInspector yavgFieldOI;
+ private DoubleObjectInspector xvarFieldOI;
+ private DoubleObjectInspector yvarFieldOI;
+ private DoubleObjectInspector covarFieldOI;
+
+ // For PARTIAL1 and PARTIAL2
+ private Object[] partialResult;
+
+ // For FINAL and COMPLETE
+ private DoubleWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ super.init(m, parameters);
+
+ // init input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ assert (parameters.length == 2);
+ xInputOI = (PrimitiveObjectInspector) parameters[0];
+ yInputOI = (PrimitiveObjectInspector) parameters[1];
+ } else {
+ assert (parameters.length == 1);
+ soi = (StructObjectInspector) parameters[0];
+
+ countField = soi.getStructFieldRef("count");
+ xavgField = soi.getStructFieldRef("xavg");
+ yavgField = soi.getStructFieldRef("yavg");
+ xvarField = soi.getStructFieldRef("xvar");
+ yvarField = soi.getStructFieldRef("yvar");
+ covarField = soi.getStructFieldRef("covar");
+
+ countFieldOI = (LongObjectInspector) countField.getFieldObjectInspector();
+ xavgFieldOI = (DoubleObjectInspector) xavgField.getFieldObjectInspector();
+ yavgFieldOI = (DoubleObjectInspector) yavgField.getFieldObjectInspector();
+ xvarFieldOI = (DoubleObjectInspector) xvarField.getFieldObjectInspector();
+ yvarFieldOI = (DoubleObjectInspector) yvarField.getFieldObjectInspector();
+ covarFieldOI = (DoubleObjectInspector) covarField.getFieldObjectInspector();
+ }
+
+ // init output
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
+ // The output of a partial aggregation is a struct containing
+ // a long count, two double averages, two double variances,
+ // and a double covariance.
+
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+
+ foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+
+ ArrayList<String> fname = new ArrayList<String>();
+ fname.add("count");
+ fname.add("xavg");
+ fname.add("yavg");
+ fname.add("xvar");
+ fname.add("yvar");
+ fname.add("covar");
+
+ partialResult = new Object[6];
+ partialResult[0] = new LongWritable(0);
+ partialResult[1] = new DoubleWritable(0);
+ partialResult[2] = new DoubleWritable(0);
+ partialResult[3] = new DoubleWritable(0);
+ partialResult[4] = new DoubleWritable(0);
+ partialResult[5] = new DoubleWritable(0);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
+
+ } else {
+ setResult(new DoubleWritable(0));
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ }
+ }
+
+ static class StdAgg implements SerializableBuffer {
+ long count; // number n of elements
+ double xavg; // average of x elements
+ double yavg; // average of y elements
+ double xvar; // n times the variance of x elements
+ double yvar; // n times the variance of y elements
+ double covar; // n times the covariance
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ count = BufferSerDeUtil.getLong(data, start);
+ start += 8;
+ xavg = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ yavg = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ xvar = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ yvar = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ covar = BufferSerDeUtil.getDouble(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeLong(count, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(xavg, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(yavg, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(xvar, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(yvar, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(covar, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeLong(count);
+ output.writeDouble(xavg);
+ output.writeDouble(yavg);
+ output.writeDouble(xvar);
+ output.writeDouble(yvar);
+ output.writeDouble(covar);
+ }
+ };
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ StdAgg result = new StdAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ myagg.count = 0;
+ myagg.xavg = 0;
+ myagg.yavg = 0;
+ myagg.xvar = 0;
+ myagg.yvar = 0;
+ myagg.covar = 0;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 2);
+ Object px = parameters[0];
+ Object py = parameters[1];
+ if (px != null && py != null) {
+ StdAgg myagg = (StdAgg) agg;
+ double vx = PrimitiveObjectInspectorUtils.getDouble(px, xInputOI);
+ double vy = PrimitiveObjectInspectorUtils.getDouble(py, yInputOI);
+ double xavgOld = myagg.xavg;
+ double yavgOld = myagg.yavg;
+ myagg.count++;
+ myagg.xavg += (vx - xavgOld) / myagg.count;
+ myagg.yavg += (vy - yavgOld) / myagg.count;
+ if (myagg.count > 1) {
+ myagg.covar += (vx - xavgOld) * (vy - myagg.yavg);
+ myagg.xvar += (vx - xavgOld) * (vx - myagg.xavg);
+ myagg.yvar += (vy - yavgOld) * (vy - myagg.yavg);
+ }
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ ((LongWritable) partialResult[0]).set(myagg.count);
+ ((DoubleWritable) partialResult[1]).set(myagg.xavg);
+ ((DoubleWritable) partialResult[2]).set(myagg.yavg);
+ ((DoubleWritable) partialResult[3]).set(myagg.xvar);
+ ((DoubleWritable) partialResult[4]).set(myagg.yvar);
+ ((DoubleWritable) partialResult[5]).set(myagg.covar);
+ return partialResult;
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ StdAgg myagg = (StdAgg) agg;
+
+ Object partialCount = soi.getStructFieldData(partial, countField);
+ Object partialXAvg = soi.getStructFieldData(partial, xavgField);
+ Object partialYAvg = soi.getStructFieldData(partial, yavgField);
+ Object partialXVar = soi.getStructFieldData(partial, xvarField);
+ Object partialYVar = soi.getStructFieldData(partial, yvarField);
+ Object partialCovar = soi.getStructFieldData(partial, covarField);
+
+ long nA = myagg.count;
+ long nB = countFieldOI.get(partialCount);
+
+ if (nA == 0) {
+ // Just copy the information since there is nothing so far
+ myagg.count = countFieldOI.get(partialCount);
+ myagg.xavg = xavgFieldOI.get(partialXAvg);
+ myagg.yavg = yavgFieldOI.get(partialYAvg);
+ myagg.xvar = xvarFieldOI.get(partialXVar);
+ myagg.yvar = yvarFieldOI.get(partialYVar);
+ myagg.covar = covarFieldOI.get(partialCovar);
+ }
+
+ if (nA != 0 && nB != 0) {
+ // Merge the two partials
+ double xavgA = myagg.xavg;
+ double yavgA = myagg.yavg;
+ double xavgB = xavgFieldOI.get(partialXAvg);
+ double yavgB = yavgFieldOI.get(partialYAvg);
+ double xvarB = xvarFieldOI.get(partialXVar);
+ double yvarB = yvarFieldOI.get(partialYVar);
+ double covarB = covarFieldOI.get(partialCovar);
+
+ myagg.count += nB;
+ myagg.xavg = (xavgA * nA + xavgB * nB) / myagg.count;
+ myagg.yavg = (yavgA * nA + yavgB * nB) / myagg.count;
+ myagg.xvar += xvarB + (xavgA - xavgB) * (xavgA - xavgB) * myagg.count;
+ myagg.yvar += yvarB + (yavgA - yavgB) * (yavgA - yavgB) * myagg.count;
+ myagg.covar += covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / myagg.count);
+ }
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+
+ if (myagg.count < 2) { // SQL standard - return null for zero or one
+ // pair
+ return null;
+ } else {
+ getResult().set(myagg.covar / java.lang.Math.sqrt(myagg.xvar) / java.lang.Math.sqrt(myagg.yvar));
+ return getResult();
+ }
+ }
+
+ public void setResult(DoubleWritable result) {
+ this.result = result;
+ }
+
+ public DoubleWritable getResult() {
+ return result;
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
new file mode 100644
index 0000000..dc5eef0
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * This class implements the COUNT aggregation function as in SQL.
+ */
+@Description(name = "count", value = "_FUNC_(*) - Returns the total number of retrieved rows, including "
+ + "rows containing NULL values.\n"
+
+ + "_FUNC_(expr) - Returns the number of rows for which the supplied " + "expression is non-NULL.\n"
+
+ + "_FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for "
+ + "which the supplied expression(s) are unique and non-NULL.")
+public class GenericUDAFCount implements GenericUDAFResolver2 {
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ // This method implementation is preserved for backward compatibility.
+ return new GenericUDAFCountEvaluator();
+ }
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) throws SemanticException {
+
+ TypeInfo[] parameters = paramInfo.getParameters();
+
+ if (parameters.length == 0) {
+ if (!paramInfo.isAllColumns()) {
+ throw new UDFArgumentException("Argument expected");
+ }
+ assert !paramInfo.isDistinct() : "DISTINCT not supported with *";
+ } else {
+ if (parameters.length > 1 && !paramInfo.isDistinct()) {
+ throw new UDFArgumentException("DISTINCT keyword must be specified");
+ }
+ assert !paramInfo.isAllColumns() : "* not supported in expression list";
+ }
+
+ return new GenericUDAFCountEvaluator().setCountAllColumns(paramInfo.isAllColumns());
+ }
+
+ /**
+ * GenericUDAFCountEvaluator.
+ */
+ public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
+ private boolean countAllColumns = false;
+ private LongObjectInspector partialCountAggOI;
+ private LongWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ super.init(m, parameters);
+ partialCountAggOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ result = new LongWritable(0);
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ }
+
+ private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) {
+ countAllColumns = countAllCols;
+ return this;
+ }
+
+ /** class for storing count value. */
+ static class CountAgg implements SerializableBuffer {
+ long value;
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ value = BufferSerDeUtil.getLong(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeLong(value, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeLong(value);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ CountAgg buffer = new CountAgg();
+ reset(buffer);
+ return buffer;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ ((CountAgg) agg).value = 0;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ // parameters == null means the input table/split is empty
+ if (parameters == null) {
+ return;
+ }
+ if (countAllColumns) {
+ assert parameters.length == 0;
+ ((CountAgg) agg).value++;
+ } else {
+ assert parameters.length > 0;
+ boolean countThisRow = true;
+ for (Object nextParam : parameters) {
+ if (nextParam == null) {
+ countThisRow = false;
+ break;
+ }
+ }
+ if (countThisRow) {
+ ((CountAgg) agg).value++;
+ }
+ }
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ long p = partialCountAggOI.get(partial);
+ ((CountAgg) agg).value += p;
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ result.set(((CountAgg) agg).value);
+ return result;
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ return terminate(agg);
+ }
+ }
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCovariance.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCovariance.java
new file mode 100644
index 0000000..0c4448b
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCovariance.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * Compute the covariance covar_pop(x, y), using the following one-pass method
+ * (ref. "Formulas for Robust, One-Pass Parallel Computation of Covariances and
+ * Arbitrary-Order Statistical Moments", Philippe Pebay, Sandia Labs):
+ * Incremental: n : <count> mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg> my_n =
+ * my_(n-1) + [y_n - my_(n-1)]/n : <yavg> c_n = c_(n-1) + (x_n - mx_(n-1))*(y_n
+ * - my_n) : <covariance * n>
+ * Merge: c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X
+ */
+@Description(name = "covariance,covar_pop", value = "_FUNC_(x,y) - Returns the population covariance of a set of number pairs", extended = "The function takes as arguments any pair of numeric types and returns a double.\n"
+ + "Any pair with a NULL is ignored. If the function is applied to an empty set, NULL\n"
+ + "will be returned. Otherwise, it computes the following:\n"
+ + " (SUM(x*y)-SUM(x)*SUM(y)/COUNT(x,y))/COUNT(x,y)\n" + "where neither x nor y is null.")
+public class GenericUDAFCovariance extends AbstractGenericUDAFResolver {
+
+ static final Log LOG = LogFactory.getLog(GenericUDAFCovariance.class.getName());
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ if (parameters.length != 2) {
+ throw new UDFArgumentTypeException(parameters.length - 1, "Exactly two arguments are expected.");
+ }
+
+ if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+
+ if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(1, "Only primitive type arguments are accepted but "
+ + parameters[1].getTypeName() + " is passed.");
+ }
+
+ switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ switch (((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new GenericUDAFCovarianceEvaluator();
+ case STRING:
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(1, "Only numeric or string type arguments are accepted but "
+ + parameters[1].getTypeName() + " is passed.");
+ }
+ case STRING:
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(0, "Only numeric or string type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ }
+
+ /**
+ * Evaluate the variance using the algorithm described in
+ * http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance,
+ * presumably by Pébay, Philippe (2008), in "Formulas for Robust, One-Pass
+ * Parallel Computation of Covariances and Arbitrary-Order Statistical
+ * Moments", Technical Report SAND2008-6212, Sandia National Laboratories,
+ * http://infoserve.sandia.gov/sand_doc/2008/086212.pdf
+ * Incremental: n : <count> mx_n = mx_(n-1) + [x_n - mx_(n-1)]/n : <xavg>
+ * my_n = my_(n-1) + [y_n - my_(n-1)]/n : <yavg> c_n = c_(n-1) + (x_n -
+ * mx_(n-1))*(y_n - my_n) : <covariance * n>
+ * Merge: c_X = c_A + c_B + (mx_A - mx_B)*(my_A - my_B)*n_A*n_B/n_X
+ * This one-pass algorithm is stable.
+ */
+ public static class GenericUDAFCovarianceEvaluator extends GenericUDAFEvaluator {
+
+ // For PARTIAL1 and COMPLETE
+ private PrimitiveObjectInspector xInputOI;
+ private PrimitiveObjectInspector yInputOI;
+
+ // For PARTIAL2 and FINAL
+ private StructObjectInspector soi;
+ private StructField countField;
+ private StructField xavgField;
+ private StructField yavgField;
+ private StructField covarField;
+ private LongObjectInspector countFieldOI;
+ private DoubleObjectInspector xavgFieldOI;
+ private DoubleObjectInspector yavgFieldOI;
+ private DoubleObjectInspector covarFieldOI;
+
+ // For PARTIAL1 and PARTIAL2
+ private Object[] partialResult;
+
+ // For FINAL and COMPLETE
+ private DoubleWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ super.init(m, parameters);
+
+ // init input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ assert (parameters.length == 2);
+ xInputOI = (PrimitiveObjectInspector) parameters[0];
+ yInputOI = (PrimitiveObjectInspector) parameters[1];
+ } else {
+ assert (parameters.length == 1);
+ soi = (StructObjectInspector) parameters[0];
+
+ countField = soi.getStructFieldRef("count");
+ xavgField = soi.getStructFieldRef("xavg");
+ yavgField = soi.getStructFieldRef("yavg");
+ covarField = soi.getStructFieldRef("covar");
+
+ countFieldOI = (LongObjectInspector) countField.getFieldObjectInspector();
+ xavgFieldOI = (DoubleObjectInspector) xavgField.getFieldObjectInspector();
+ yavgFieldOI = (DoubleObjectInspector) yavgField.getFieldObjectInspector();
+ covarFieldOI = (DoubleObjectInspector) covarField.getFieldObjectInspector();
+ }
+
+ // init output
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
+ // The output of a partial aggregation is a struct containing
+ // a long count, two double averages, and a double covariance.
+
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+
+ foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+
+ ArrayList<String> fname = new ArrayList<String>();
+ fname.add("count");
+ fname.add("xavg");
+ fname.add("yavg");
+ fname.add("covar");
+
+ partialResult = new Object[4];
+ partialResult[0] = new LongWritable(0);
+ partialResult[1] = new DoubleWritable(0);
+ partialResult[2] = new DoubleWritable(0);
+ partialResult[3] = new DoubleWritable(0);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
+
+ } else {
+ setResult(new DoubleWritable(0));
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ }
+ }
+
+ static class StdAgg implements SerializableBuffer {
+ long count; // number n of elements
+ double xavg; // average of x elements
+ double yavg; // average of y elements
+ double covar; // n times the covariance
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ count = BufferSerDeUtil.getLong(data, start);
+ start += 8;
+ xavg = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ yavg = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ covar = BufferSerDeUtil.getDouble(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeLong(count, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(xavg, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(yavg, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(covar, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeLong(count);
+ output.writeDouble(xavg);
+ output.writeDouble(yavg);
+ output.writeDouble(covar);
+ }
+ };
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ StdAgg result = new StdAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ myagg.count = 0;
+ myagg.xavg = 0;
+ myagg.yavg = 0;
+ myagg.covar = 0;
+ }
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 2);
+ Object px = parameters[0];
+ Object py = parameters[1];
+ if (px != null && py != null) {
+ StdAgg myagg = (StdAgg) agg;
+ double vx = PrimitiveObjectInspectorUtils.getDouble(px, xInputOI);
+ double vy = PrimitiveObjectInspectorUtils.getDouble(py, yInputOI);
+ myagg.count++;
+ myagg.yavg = myagg.yavg + (vy - myagg.yavg) / myagg.count;
+ if (myagg.count > 1) {
+ myagg.covar += (vx - myagg.xavg) * (vy - myagg.yavg);
+ }
+ myagg.xavg = myagg.xavg + (vx - myagg.xavg) / myagg.count;
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ ((LongWritable) partialResult[0]).set(myagg.count);
+ ((DoubleWritable) partialResult[1]).set(myagg.xavg);
+ ((DoubleWritable) partialResult[2]).set(myagg.yavg);
+ ((DoubleWritable) partialResult[3]).set(myagg.covar);
+ return partialResult;
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ StdAgg myagg = (StdAgg) agg;
+
+ Object partialCount = soi.getStructFieldData(partial, countField);
+ Object partialXAvg = soi.getStructFieldData(partial, xavgField);
+ Object partialYAvg = soi.getStructFieldData(partial, yavgField);
+ Object partialCovar = soi.getStructFieldData(partial, covarField);
+
+ long nA = myagg.count;
+ long nB = countFieldOI.get(partialCount);
+
+ if (nA == 0) {
+ // Just copy the information since there is nothing so far
+ myagg.count = countFieldOI.get(partialCount);
+ myagg.xavg = xavgFieldOI.get(partialXAvg);
+ myagg.yavg = yavgFieldOI.get(partialYAvg);
+ myagg.covar = covarFieldOI.get(partialCovar);
+ }
+
+ if (nA != 0 && nB != 0) {
+ // Merge the two partials
+ double xavgA = myagg.xavg;
+ double yavgA = myagg.yavg;
+ double xavgB = xavgFieldOI.get(partialXAvg);
+ double yavgB = yavgFieldOI.get(partialYAvg);
+ double covarB = covarFieldOI.get(partialCovar);
+
+ myagg.count += nB;
+ myagg.xavg = (xavgA * nA + xavgB * nB) / myagg.count;
+ myagg.yavg = (yavgA * nA + yavgB * nB) / myagg.count;
+ myagg.covar += covarB + (xavgA - xavgB) * (yavgA - yavgB) * ((double) (nA * nB) / myagg.count);
+ }
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+
+ if (myagg.count == 0) { // SQL standard - return null for zero
+ // elements
+ return null;
+ } else {
+ getResult().set(myagg.covar / (myagg.count));
+ return getResult();
+ }
+ }
+
+ public void setResult(DoubleWritable result) {
+ this.result = result;
+ }
+
+ public DoubleWritable getResult() {
+ return result;
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
new file mode 100644
index 0000000..afdc397
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * GenericUDAFSum.
+ */
+@Description(name = "sum", value = "_FUNC_(x) - Returns the sum of a set of numbers")
+public class GenericUDAFSum extends AbstractGenericUDAFResolver {
+
+ static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ if (parameters.length != 1) {
+ throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
+ }
+
+ if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new GenericUDAFSumLong();
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return new GenericUDAFSumDouble();
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(0, "Only numeric or string type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ }
+
+ /**
+ * GenericUDAFSumDouble.
+ */
+ public static class GenericUDAFSumDouble extends GenericUDAFEvaluator {
+ private PrimitiveObjectInspector inputOI;
+ private DoubleWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ super.init(m, parameters);
+ result = new DoubleWritable(0);
+ inputOI = (PrimitiveObjectInspector) parameters[0];
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ }
+
+ /** class for storing double sum value. */
+ static class SumDoubleAgg implements SerializableBuffer {
+ boolean empty;
+ double sum;
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ empty = BufferSerDeUtil.getBoolean(data, start);
+ start += 1;
+ sum = BufferSerDeUtil.getDouble(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeBoolean(empty, data, start);
+ start += 1;
+ BufferSerDeUtil.writeDouble(sum, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeBoolean(empty);
+ output.writeDouble(sum);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ SumDoubleAgg result = new SumDoubleAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ SumDoubleAgg myagg = (SumDoubleAgg) agg;
+ myagg.empty = true;
+ myagg.sum = 0;
+ }
+
+ boolean warned = false;
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ try {
+ merge(agg, parameters[0]);
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName() + " ignoring similar exceptions.");
+ }
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ return terminate(agg);
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ SumDoubleAgg myagg = (SumDoubleAgg) agg;
+ myagg.empty = false;
+ myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ SumDoubleAgg myagg = (SumDoubleAgg) agg;
+ if (myagg.empty) {
+ return null;
+ }
+ result.set(myagg.sum);
+ return result;
+ }
+
+ }
+
+ /**
+ * GenericUDAFSumLong.
+ */
+ public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
+ private PrimitiveObjectInspector inputOI;
+ private LongWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ super.init(m, parameters);
+ result = new LongWritable(0);
+ inputOI = (PrimitiveObjectInspector) parameters[0];
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ }
+
+ /** class for storing double sum value. */
+ static class SumLongAgg implements SerializableBuffer {
+ boolean empty;
+ long sum;
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ empty = BufferSerDeUtil.getBoolean(data, start);
+ start += 1;
+ sum = BufferSerDeUtil.getLong(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeBoolean(empty, data, start);
+ start += 1;
+ BufferSerDeUtil.writeLong(sum, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeBoolean(empty);
+ output.writeLong(sum);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ SumLongAgg result = new SumLongAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ SumLongAgg myagg = (SumLongAgg) agg;
+ myagg.empty = true;
+ myagg.sum = 0;
+ }
+
+ private boolean warned = false;
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ try {
+ merge(agg, parameters[0]);
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ return terminate(agg);
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ SumLongAgg myagg = (SumLongAgg) agg;
+ myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
+ myagg.empty = false;
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ SumLongAgg myagg = (SumLongAgg) agg;
+ if (myagg.empty) {
+ return null;
+ }
+ result.set(myagg.sum);
+ return result;
+ }
+
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
new file mode 100644
index 0000000..e839008
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
+
+import edu.uci.ics.hivesterix.runtime.evaluator.BufferSerDeUtil;
+import edu.uci.ics.hivesterix.runtime.evaluator.SerializableBuffer;
+
+/**
+ * Compute the variance. This class is extended by: GenericUDAFVarianceSample
+ * GenericUDAFStd GenericUDAFStdSample
+ */
+@Description(name = "variance,var_pop", value = "_FUNC_(x) - Returns the variance of a set of numbers")
+public class GenericUDAFVariance extends AbstractGenericUDAFResolver {
+
+ static final Log LOG = LogFactory.getLog(GenericUDAFVariance.class.getName());
+
+ @Override
+ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
+ if (parameters.length != 1) {
+ throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
+ }
+
+ if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return new GenericUDAFVarianceEvaluator();
+ case BOOLEAN:
+ default:
+ throw new UDFArgumentTypeException(0, "Only numeric or string type arguments are accepted but "
+ + parameters[0].getTypeName() + " is passed.");
+ }
+ }
+
+ /**
+ * Evaluate the variance using the algorithm described by Chan, Golub, and
+ * LeVeque in
+ * "Algorithms for computing the sample variance: analysis and recommendations"
+ * The American Statistician, 37 (1983) pp. 242--247.
+ * variance = variance1 + variance2 + n/(m*(m+n)) * pow(((m/n)*t1 - t2),2)
+ * where: - variance is sum[x-avg^2] (this is actually n times the variance)
+ * and is updated at every step. - n is the count of elements in chunk1 - m
+ * is the count of elements in chunk2 - t1 = sum of elements in chunk1, t2 =
+ * sum of elements in chunk2.
+ * This algorithm was proven to be numerically stable by J.L. Barlow in
+ * "Error analysis of a pairwise summation algorithm to compute sample variance"
+ * Numer. Math, 58 (1991) pp. 583--590
+ */
+ public static class GenericUDAFVarianceEvaluator extends GenericUDAFEvaluator {
+
+ // For PARTIAL1 and COMPLETE
+ private PrimitiveObjectInspector inputOI;
+
+ // For PARTIAL2 and FINAL
+ private StructObjectInspector soi;
+ private StructField countField;
+ private StructField sumField;
+ private StructField varianceField;
+ private LongObjectInspector countFieldOI;
+ private DoubleObjectInspector sumFieldOI;
+
+ // For PARTIAL1 and PARTIAL2
+ private Object[] partialResult;
+
+ // For FINAL and COMPLETE
+ private DoubleWritable result;
+
+ @Override
+ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ super.init(m, parameters);
+
+ // init input
+ if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
+ inputOI = (PrimitiveObjectInspector) parameters[0];
+ } else {
+ soi = (StructObjectInspector) parameters[0];
+
+ countField = soi.getStructFieldRef("count");
+ sumField = soi.getStructFieldRef("sum");
+ varianceField = soi.getStructFieldRef("variance");
+
+ countFieldOI = (LongObjectInspector) countField.getFieldObjectInspector();
+ sumFieldOI = (DoubleObjectInspector) sumField.getFieldObjectInspector();
+ }
+
+ // init output
+ if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
+ // The output of a partial aggregation is a struct containing
+ // a long count and doubles sum and variance.
+
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+
+ foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+
+ ArrayList<String> fname = new ArrayList<String>();
+ fname.add("count");
+ fname.add("sum");
+ fname.add("variance");
+
+ partialResult = new Object[3];
+ partialResult[0] = new LongWritable(0);
+ partialResult[1] = new DoubleWritable(0);
+ partialResult[2] = new DoubleWritable(0);
+
+ return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
+
+ } else {
+ setResult(new DoubleWritable(0));
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ }
+ }
+
+ static class StdAgg implements SerializableBuffer {
+ long count; // number of elements
+ double sum; // sum of elements
+ double variance; // sum[x-avg^2] (this is actually n times the
+ // variance)
+
+ @Override
+ public void deSerializeAggBuffer(byte[] data, int start, int len) {
+ count = BufferSerDeUtil.getLong(data, start);
+ start += 8;
+ sum = BufferSerDeUtil.getDouble(data, start);
+ start += 8;
+ variance = BufferSerDeUtil.getDouble(data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(byte[] data, int start, int len) {
+ BufferSerDeUtil.writeLong(count, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(sum, data, start);
+ start += 8;
+ BufferSerDeUtil.writeDouble(variance, data, start);
+ }
+
+ @Override
+ public void serializeAggBuffer(DataOutput output) throws IOException {
+ output.writeLong(count);
+ output.writeDouble(sum);
+ output.writeDouble(variance);
+ }
+ };
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ StdAgg result = new StdAgg();
+ reset(result);
+ return result;
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ myagg.count = 0;
+ myagg.sum = 0;
+ myagg.variance = 0;
+ }
+
+ private boolean warned = false;
+
+ @Override
+ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
+ assert (parameters.length == 1);
+ Object p = parameters[0];
+ if (p != null) {
+ StdAgg myagg = (StdAgg) agg;
+ try {
+ double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
+ myagg.count++;
+ myagg.sum += v;
+ if (myagg.count > 1) {
+ double t = myagg.count * v - myagg.sum;
+ myagg.variance += (t * t) / ((double) myagg.count * (myagg.count - 1));
+ }
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName() + " ignoring similar exceptions.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public Object terminatePartial(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+ ((LongWritable) partialResult[0]).set(myagg.count);
+ ((DoubleWritable) partialResult[1]).set(myagg.sum);
+ ((DoubleWritable) partialResult[2]).set(myagg.variance);
+ return partialResult;
+ }
+
+ @Override
+ public void merge(AggregationBuffer agg, Object partial) throws HiveException {
+ if (partial != null) {
+ StdAgg myagg = (StdAgg) agg;
+
+ Object partialCount = soi.getStructFieldData(partial, countField);
+ Object partialSum = soi.getStructFieldData(partial, sumField);
+ Object partialVariance = soi.getStructFieldData(partial, varianceField);
+
+ long n = myagg.count;
+ long m = countFieldOI.get(partialCount);
+
+ if (n == 0) {
+ // Just copy the information since there is nothing so far
+ myagg.variance = sumFieldOI.get(partialVariance);
+ myagg.count = countFieldOI.get(partialCount);
+ myagg.sum = sumFieldOI.get(partialSum);
+ }
+
+ if (m != 0 && n != 0) {
+ // Merge the two partials
+
+ double a = myagg.sum;
+ double b = sumFieldOI.get(partialSum);
+
+ myagg.count += m;
+ myagg.sum += b;
+ double t = (m / (double) n) * a - b;
+ myagg.variance += sumFieldOI.get(partialVariance) + ((n / (double) m) / ((double) n + m)) * t * t;
+ }
+ }
+ }
+
+ @Override
+ public Object terminate(AggregationBuffer agg) throws HiveException {
+ StdAgg myagg = (StdAgg) agg;
+
+ if (myagg.count == 0) { // SQL standard - return null for zero
+ // elements
+ return null;
+ } else {
+ if (myagg.count > 1) {
+ getResult().set(myagg.variance / (myagg.count));
+ } else { // for one element the variance is always 0
+ getResult().set(0);
+ }
+ return getResult();
+ }
+ }
+
+ public void setResult(DoubleWritable result) {
+ this.result = result;
+ }
+
+ public DoubleWritable getResult() {
+ return result;
+ }
+ }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
new file mode 100644
index 0000000..0c701c8
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.hyracks;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}