Initial Algebricks integration
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_algebricks_integration@854 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
new file mode 100644
index 0000000..ed98eec
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class IntArrayUnnester implements IUnnestingFunctionFactory {
+
+ private int[] x;
+
+ public IntArrayUnnester(int[] x) {
+ this.x = x;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException {
+
+ final DataOutput out = provider.getDataOutput();
+
+ return new IUnnestingFunction() {
+
+ private int pos;
+
+ @Override
+ public void init(IFrameTupleReference tuple) throws AlgebricksException {
+ pos = 0;
+ }
+
+ @Override
+ public boolean step() throws AlgebricksException {
+ try {
+ if (pos < x.length) {
+ // Writes one byte to distinguish between null
+ // values and end of sequence.
+ out.writeInt(x[pos]);
+ ++pos;
+ return true;
+ } else {
+ return false;
+ }
+
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ };
+
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
new file mode 100644
index 0000000..57c0789
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerAddEvalFactory implements IEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory evalLeftFactory;
+ private IEvaluatorFactory evalRightFactory;
+
+ public IntegerAddEvalFactory(IEvaluatorFactory evalLeftFactory, IEvaluatorFactory evalRightFactory) {
+ this.evalLeftFactory = evalLeftFactory;
+ this.evalRightFactory = evalRightFactory;
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+ private ArrayBackedValueStorage argOut = new ArrayBackedValueStorage();
+
+ private IEvaluator evalLeft = evalLeftFactory.createEvaluator(argOut);
+ private IEvaluator evalRight = evalRightFactory.createEvaluator(argOut);
+
+ @SuppressWarnings("static-access")
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ argOut.reset();
+ evalLeft.evaluate(tuple);
+ int v1 = IntegerSerializerDeserializer.INSTANCE.getInt(argOut.getBytes(), 0);
+ argOut.reset();
+ evalRight.evaluate(tuple);
+ int v2 = IntegerSerializerDeserializer.INSTANCE.getInt(argOut.getBytes(), 0);
+ try {
+ out.writeInt(v1 + v2);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
new file mode 100644
index 0000000..880a4df
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerConstantEvalFactory implements IEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final int value;
+
+ public IntegerConstantEvalFactory(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "IntConstantEvalFactory " + value;
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+ private ArrayBackedValueStorage buf = new ArrayBackedValueStorage();
+ boolean first = true;
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ if (first) {
+ first = false;
+ try {
+ IntegerSerializerDeserializer.INSTANCE.serialize(value, buf.getDataOutput());
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ try {
+ out.write(buf.getBytes(), 0, buf.getLength());
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
new file mode 100644
index 0000000..e15851b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerEqualsEvalFactory implements IEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory evalFact1, evalFact2;
+
+ public IntegerEqualsEvalFactory(IEvaluatorFactory evalFact1, IEvaluatorFactory evalFact2) {
+ this.evalFact1 = evalFact1;
+ this.evalFact2 = evalFact2;
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+ private DataOutput dataout = output.getDataOutput();
+ private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
+ private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
+ private IEvaluator eval1 = evalFact1.createEvaluator(out1);
+ private IEvaluator eval2 = evalFact2.createEvaluator(out2);
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ out1.reset();
+ eval1.evaluate(tuple);
+ out2.reset();
+ eval2.evaluate(tuple);
+ int v1 = IntegerSerializerDeserializer.getInt(out1.getBytes(), 0);
+ int v2 = IntegerSerializerDeserializer.getInt(out2.getBytes(), 0);
+ boolean r = v1 == v2;
+ try {
+ dataout.writeBoolean(r);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
new file mode 100644
index 0000000..5c5d6d4
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerGreaterThanEvalFactory implements IEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory evalFact1, evalFact2;
+
+ public IntegerGreaterThanEvalFactory(IEvaluatorFactory evalFact1, IEvaluatorFactory evalFact2) {
+ this.evalFact1 = evalFact1;
+ this.evalFact2 = evalFact2;
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+ private DataOutput dataout = output.getDataOutput();
+ private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
+ private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
+ private IEvaluator eval1 = evalFact1.createEvaluator(out1);
+ private IEvaluator eval2 = evalFact2.createEvaluator(out2);
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ out1.reset();
+ eval1.evaluate(tuple);
+ out2.reset();
+ eval2.evaluate(tuple);
+ int v1 = IntegerSerializerDeserializer.getInt(out1.getBytes(), 0);
+ int v2 = IntegerSerializerDeserializer.getInt(out2.getBytes(), 0);
+ boolean r = v1 > v2;
+ try {
+ dataout.writeBoolean(r);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java
new file mode 100644
index 0000000..c914c7c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.script;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class IdentityStreamingScript {
+
+ public static void main(String args[]) throws IOException {
+ BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
+ String s;
+ while ((s = r.readLine()) != null) {
+ System.out.println(s);
+ }
+ r.close();
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd
new file mode 100644
index 0000000..b8eb4a0
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd
@@ -0,0 +1,63 @@
+@ECHO OFF
+SETLOCAL
+
+:: Licensed to the Apache Software Foundation (ASF) under one or more
+:: contributor license agreements. See the NOTICE file distributed with
+:: this work for additional information regarding copyright ownership.
+:: The ASF licenses this file to You under the Apache License, Version 2.0
+:: (the "License"); you may not use this file except in compliance with
+:: the License. You may obtain a copy of the License at
+::
+:: http://www.apache.org/licenses/LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing, software
+:: distributed under the License is distributed on an "AS IS" BASIS,
+:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+:: See the License for the specific language governing permissions and
+:: limitations under the License.
+
+:: JAVA classpath
+:: Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+:: the classpath. Separate multiple paths with ":". Enclose the value
+:: in double quotes. Adding additional files or locations on separate
+:: lines makes things clearer.
+:: Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+:: Example:
+::
+:: Set the CLASSPATH to a jar file and a directory. Note that
+:: "classes dir" is a directory of class files with a space in the name.
+::
+:: CLASSPATH="usr/local/Product1/lib/product.jar"
+:: CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+::
+SET CLASSPATH="@classpath@"
+
+:: JVM parameters
+:: If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+:: for the Java virtual machine set the local variable JVM_PARAMETERS below
+:: Example:
+:: JVM_PARAMETERS=-Xms100M -Xmx200M
+::
+:: Below are the JVM parameters needed to do remote debugging using Intellij
+:: IDEA. Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+:: IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+::
+:: JVM_PARAMETERS=
+
+:: ---------------------------------------------------------------------------
+:: Default configuration. Do not modify below this line.
+:: ---------------------------------------------------------------------------
+:: Application specific parameters
+
+SET MAIN_CLASS=@main.class@
+SET JVM_PARAMS=@jvm.params@
+SET PROGRAM_PARAMS=@program.params@
+
+:: Try to find java virtual machine
+IF NOT DEFINED JAVA (
+ IF NOT DEFINED JAVA_HOME SET JAVA="java.exe"
+ IF DEFINED JAVA_HOME SET JAVA="%JAVA_HOME%\bin\java.exe"
+)
+
+:: Run program
+%JAVA% %JVM_PARAMS% %JVM_PARAMETERS% -classpath %CLASSPATH% %MAIN_CLASS% %PROGRAM_PARAMS% %*
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh
new file mode 100644
index 0000000..a998626
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh
@@ -0,0 +1,81 @@
+#!/bin/sh
+# JAVA classpath
+# Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+# the classpath. Separate multiple paths with ":". Enclose the value
+# in double quotes. Adding additional files or locations on separate
+# lines makes things clearer.
+# Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+# Example:
+#
+# Set the CLASSPATH to a jar file and a directory. Note that
+# "classes dir" is a directory of class files with a space in the name.
+#
+# CLASSPATH="usr/local/Product1/lib/product.jar"
+# CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+#
+CLASSPATH="@classpath@"
+
+# JVM parameters
+# If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+# for the Java virtual machine set the local variable JVM_PARAMETERS below
+# Example:
+# JVM_PARAMETERS=-Xms100M -Xmx200M
+#
+# Below are the JVM parameters needed to do remote debugging using Intellij
+# IDEA. Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+# IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+#
+# JVM_PARAMETERS=
+
+#run with shared memory setup
+#if [ -n "${RUN_SHARED_MEM}"]; then
+# JVM_PARAMETERS="${JVM_PARAMETERS} -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=n,address=javadebug,suspend=y"
+#fi
+
+# ---------------------------------------------------------------------------
+# Default configuration. Do not modify below this line.
+# ---------------------------------------------------------------------------
+# Application specific parameters
+
+MAIN_CLASS="@main.class@"
+JVM_PARAMS="@jvm.params@"
+PROGRAM_PARAMS="@program.params@"
+
+# Cygwin support. $cygwin _must_ be set to either true or false.
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ *) cygwin=false ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin; then
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# Try to find java virtual machine
+if [ -z "${JAVA}" ]; then
+ if [ -z "${JAVA_HOME}" ]; then
+ JAVA=java
+ else
+ JAVA=${JAVA_HOME}/bin/java
+ fi
+fi
+
+# Try to find directory where this script is located
+COMMAND="${PWD}/$0"
+if [ ! -f "${COMMAND}" ]; then
+ COMMAND="$0"
+fi
+BASEDIR=`expr "${COMMAND}" : '\(.*\)/\.*'`
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+# JAVA=`cygpath --path --windows "$JAVA"`
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+fi
+
+# Run program
+${JAVA} ${JVM_PARAMS} ${JVM_PARAMETERS} -classpath "${CLASSPATH}" ${MAIN_CLASS} ${PROGRAM_PARAMS} $*
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
new file mode 100644
index 0000000..6bd055d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -0,0 +1,893 @@
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.aggregators.TupleCountAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.aggregators.TupleCountRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.evaluators.ColumnAccessEvalFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.BinaryBooleanInspectorImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.BinaryIntegerInspectorImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.IntegerPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.NoopNullWriterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.UTF8StringPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.AggregateRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.SubplanRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.sort.InMemorySortRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.NestedTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.PrinterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.RunningAggregateRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamLimitRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamProjectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamSelectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StringStreamingRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.UnnestRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.algebricks.tests.util.AlgebricksHyracksIntegrationUtil;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+
+public class PushRuntimeTest {
+
+ private static final String SEPARATOR = System.getProperty("file.separator");
+ private static final String PATH_ACTUAL = "rttest";
+ private static final String PATH_BASE = "src" + SEPARATOR + "test" + SEPARATOR + "resources";
+ private static final String PATH_EXPECTED = PATH_BASE + SEPARATOR + "results";
+
+ private static final String[] DEFAULT_NODES = new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+ AlgebricksHyracksIntegrationUtil.init();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ AlgebricksHyracksIntegrationUtil.deinit();
+ File outdir = new File(PATH_ACTUAL);
+ File[] files = outdir.listFiles();
+ if (files == null || files.length == 0) {
+ outdir.delete();
+ }
+ }
+
+ @Test
+ public void etsAssignPrint() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+ IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+ const2 }, new int[] { 0, 1 });
+ RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ PrinterRuntimeFactory printer = new PrinterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+ IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, assignDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, assign, printer },
+ new RecordDescriptor[] { etsDesc, assignDesc, null });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+ }
+
+ @Test
+ public void etsAssignWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+ IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+ const2 }, new int[] { 0, 1 });
+ RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+ IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+ PrinterBasedWriterFactory.INSTANCE, assignDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, assign, writer }, new RecordDescriptor[] { etsDesc, assignDesc, null });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("400; 3", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanSelectWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] intFileSplits = new FileSplit[1];
+ intFileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/simple/int-part1.tbl")));
+ IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(intFileSplits);
+ RecordDescriptor intScannerDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE };
+ FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), intScannerDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+ // the algebricks op.
+ IEvaluatorFactory cond = new IntegerGreaterThanEvalFactory(new IntegerConstantEvalFactory(2),
+ new ColumnAccessEvalFactory(0));
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 0 },
+ BinaryBooleanInspectorImpl.INSTANCE);
+ RecordDescriptor selectDesc = intScannerDesc;
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanSelectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ selectDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, algebricksOp, 0);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("0", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void etsAssignProjectWrite() throws Exception {
+
+ JobSpecification spec = new JobSpecification();
+ IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+ IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+ const2 }, new int[] { 0, 1 });
+ RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ StreamProjectRuntimeFactory project = new StreamProjectRuntimeFactory(new int[] { 1 });
+ RecordDescriptor projectDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignProjectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ projectDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, assign, project, writer }, new RecordDescriptor[] { etsDesc,
+ assignDesc, projectDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("3", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanLimitWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner, DEFAULT_NODES);
+
+ // the algebricks op.
+ StreamLimitRuntimeFactory limit = new StreamLimitRuntimeFactory(new IntegerConstantEvalFactory(2), null,
+ new int[] { 0 }, BinaryIntegerInspectorImpl.INSTANCE);
+ RecordDescriptor limitDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanLimitWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ limitDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { limit, writer }, new RecordDescriptor[] { limitDesc, null });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("12", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void etsUnnestWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ IUnnestingFunctionFactory aggregFactory = new IntArrayUnnester(new int[] { 100, 200, 300 });
+ UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[] { 0 });
+ RecordDescriptor unnestDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsUnnestWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ unnestDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, unnest, writer }, new RecordDescriptor[] { etsDesc, unnestDesc, null });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("100200300", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanAggregateWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer-part1.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the algebricks op.
+ AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+ new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+ RecordDescriptor aggDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanAggregateWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ aggDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { agg, writer }, new RecordDescriptor[] { aggDesc, null });
+
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("75", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanSortGbySelectWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the sort (by nation id)
+ RecordDescriptor sortDesc = scannerDesc;
+ InMemorySortOperatorDescriptor sort = new InMemorySortOperatorDescriptor(spec, new int[] { 3 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, sortDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sort,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the group-by
+ NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+ RecordDescriptor ntsDesc = sortDesc;
+ AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+ new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+ RecordDescriptor aggDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg },
+ new RecordDescriptor[] { ntsDesc, aggDesc });
+ NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory(
+ new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {});
+ RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor gby = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 3 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the algebricks op.
+ IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+ new ColumnAccessEvalFactory(0)); // Canadian customers
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+ BinaryBooleanInspectorImpl.INSTANCE);
+ RecordDescriptor selectDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanSortGbySelectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ selectDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, gby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), gby, 0, algebricksOp, 0);
+ spec.addRoot(algebricksOp);
+
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("9", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanHashGbySelectWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the group-by
+ RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(new int[] { 3 },
+ new IBinaryHashFunctionFactory[] { IntegerBinaryHashFunctionFactory.INSTANCE });
+ IAggregateFunctionFactory[] aggFuns = new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() };
+ IAccumulatingAggregatorFactory aggFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFuns,
+ new int[] { 3 }, new int[] {});
+ HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 3 }, tpcf,
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, aggFactory, gbyDesc, 1024);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the algebricks op.
+ IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+ new ColumnAccessEvalFactory(0)); // Canadian customers
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+ BinaryBooleanInspectorImpl.INSTANCE);
+ RecordDescriptor selectDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanHashGbySelectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ selectDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, gby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), gby, 0, algebricksOp, 0);
+ spec.addRoot(algebricksOp);
+
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("9", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void etsUnnestRunningaggregateWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ IUnnestingFunctionFactory aggregFactory = new IntArrayUnnester(new int[] { 100, 200, 300 });
+ UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[] { 0 });
+ RecordDescriptor unnestDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ RunningAggregateRuntimeFactory ragg = new RunningAggregateRuntimeFactory(new int[] { 1 },
+ new IRunningAggregateFunctionFactory[] { new TupleCountRunningAggregateFunctionFactory() }, new int[] {
+ 0, 1 });
+ RecordDescriptor raggDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsUnnestRunningaggregateWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 1 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ raggDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, unnest, ragg, writer }, new RecordDescriptor[] { etsDesc, unnestDesc,
+ raggDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("123", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void etsAssignScriptWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+ IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+ const2 }, new int[] { 0, 1 });
+ RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ IValueParserFactory[] valueParsers = { IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE };
+
+ String osname = System.getProperty("os.name");
+ String command;
+ if (osname.equals("Linux")) {
+ command = "bash target/testscripts/idscript";
+ } else if (osname.startsWith("Windows")) {
+ command = "target\\testscripts\\idscript.cmd";
+ } else {
+ // don't know how to test
+ return;
+ }
+
+ StringStreamingRuntimeFactory script = new StringStreamingRuntimeFactory(command, new IPrinterFactory[] {
+ IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, ' ',
+ new DelimitedDataTupleParserFactory(valueParsers, ' '));
+ RecordDescriptor scriptDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignScriptWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+ IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+ PrinterBasedWriterFactory.INSTANCE, scriptDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, assign, script, writer }, new RecordDescriptor[] { etsDesc,
+ assignDesc, scriptDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("400; 3", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanSplitWrite() throws Exception {
+ final int outputArity = 2;
+
+ JobSpecification spec = new JobSpecification();
+
+ String inputFileName = "data/tpch0.001/customer.tbl";
+ File inputFile = new File(inputFileName);
+ File[] outputFile = new File[outputArity];
+ for (int i = 0; i < outputArity; i++) {
+ outputFile[i] = File.createTempFile("splitop", null);
+ }
+
+ FileSplit[] inputSplits = new FileSplit[] { new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+ new FileReference(inputFile)) };
+
+ DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+ RecordDescriptor stringRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+ FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+ inputSplits), stringParser, stringRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+ for (int i = 0; i < outputArity; i++) {
+ outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(
+ AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+ }
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ for (int i = 0; i < outputArity; i++) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+ }
+
+ for (int i = 0; i < outputArity; i++) {
+ spec.addRoot(outputOp[i]);
+ }
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ for (int i = 0; i < outputArity; i++) {
+ compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ }
+ }
+
+ @Test
+ public void scanMicroSortWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/nation.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the algebricks op.
+ InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null);
+ RecordDescriptor sortDesc = scannerDesc;
+
+ String fileName = "scanMicroSortWrite.out";
+ String filePath = PATH_ACTUAL + SEPARATOR + fileName;
+ String resultFilePath = PATH_EXPECTED + SEPARATOR + fileName;
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1, 2, 3 }, new IPrinterFactory[] {
+ IntegerPrinterFactory.INSTANCE, UTF8StringPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE,
+ UTF8StringPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE, sortDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { sort, writer }, new RecordDescriptor[] { sortDesc, null });
+
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ compareFiles(filePath, resultFilePath);
+ outFile.delete();
+ }
+
+ @Test
+ public void etsAssignSubplanProjectWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+ IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+ EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+ RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+
+ AssignRuntimeFactory assign1 = new AssignRuntimeFactory(new int[] { 0 }, new IEvaluatorFactory[] { const1 },
+ new int[] { 0 });
+ RecordDescriptor assign1Desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+
+ AssignRuntimeFactory assign2 = new AssignRuntimeFactory(new int[] { 1 },
+ new IEvaluatorFactory[] { new IntegerAddEvalFactory(new ColumnAccessEvalFactory(0), const2) },
+ new int[] { 0, 1 });
+ RecordDescriptor assign2Desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ StreamProjectRuntimeFactory project1 = new StreamProjectRuntimeFactory(new int[] { 1 });
+ RecordDescriptor project1Desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, assign2, project1 },
+ new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc });
+
+ SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline,
+ new INullWriterFactory[] { NoopNullWriterFactory.INSTANCE }, assign1Desc, null);
+
+ RecordDescriptor subplanDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ StreamProjectRuntimeFactory project2 = new StreamProjectRuntimeFactory(new int[] { 1 });
+ RecordDescriptor project2Desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignSubplanProjectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ project2Desc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+ new IPushRuntimeFactory[] { ets, assign1, subplan, project2, writer }, new RecordDescriptor[] {
+ etsDesc, assign1Desc, subplanDesc, project2Desc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+
+ spec.addRoot(algebricksOp);
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("403", buf.toString());
+ outFile.delete();
+ }
+
+ @Test
+ public void scanMicroSortGbySelectWrite() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ // the scanner
+ FileSplit[] fileSplits = new FileSplit[1];
+ fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl")));
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ // the sort (by nation id)
+ RecordDescriptor sortDesc = scannerDesc;
+ InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, null,
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null);
+
+ // the group-by
+ NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+ RecordDescriptor ntsDesc = sortDesc;
+ AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+ new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+ RecordDescriptor aggDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg },
+ new RecordDescriptor[] { ntsDesc, aggDesc });
+ NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory(
+ new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {});
+ RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ MicroPreClusteredGroupRuntimeFactory gby = new MicroPreClusteredGroupRuntimeFactory(new int[] { 3 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, sortDesc, gbyDesc,
+ null);
+
+ // the algebricks op.
+ IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+ new ColumnAccessEvalFactory(0)); // Canadian customers
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+ BinaryBooleanInspectorImpl.INSTANCE);
+ RecordDescriptor selectDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+ String filePath = PATH_ACTUAL + SEPARATOR + "scanSortGbySelectWrite.out";
+ File outFile = new File(filePath);
+ SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+ new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+ selectDesc);
+
+ AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] { sort, gby, select, writer }, new RecordDescriptor[] { sortDesc, gbyDesc,
+ selectDesc, null });
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+ spec.addRoot(algebricksOp);
+
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+ StringBuilder buf = new StringBuilder();
+ readFileToString(outFile, buf);
+ Assert.assertEquals("9", buf.toString());
+ outFile.delete();
+ }
+
+ private static void readFileToString(File file, StringBuilder buf) throws Exception {
+ BufferedReader result = new BufferedReader(new FileReader(file));
+ boolean first = true;
+ while (true) {
+ String s = result.readLine();
+ if (s == null) {
+ break;
+ } else {
+ if (!first) {
+ first = false;
+ buf.append('\n');
+ }
+ buf.append(s);
+ }
+ }
+ result.close();
+ }
+
+ public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+ BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+ BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+ String lineA, lineB;
+ while ((lineA = fileA.readLine()) != null) {
+ lineB = fileB.readLine();
+ Assert.assertEquals(lineA, lineB);
+ }
+ Assert.assertNull(fileB.readLine());
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
new file mode 100644
index 0000000..73dc16d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
@@ -0,0 +1,97 @@
+package edu.uci.ics.hyracks.algebricks.tests.tools;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.algebricks.core.utils.WriteValueTools;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+
+public class WriteValueTest {
+
+ private ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+
+ @Test
+ public void writeIntegers() throws Exception {
+ writeIntTest(6);
+ writeIntTest(1234);
+ writeIntTest(-1234);
+ writeIntTest(Integer.MAX_VALUE);
+ writeIntTest(Integer.MAX_VALUE - 1);
+ writeIntTest(Integer.MIN_VALUE);
+ writeIntTest(Integer.MIN_VALUE + 1);
+ }
+
+ @Test
+ public void writeLongs() throws Exception {
+ writeLongTest(Integer.MAX_VALUE);
+ writeLongTest(Integer.MAX_VALUE - 1);
+ writeLongTest(Integer.MIN_VALUE);
+ writeLongTest(Integer.MIN_VALUE + 1);
+ writeLongTest(0L);
+ writeLongTest(1234567890L);
+ writeLongTest(-1234567890L);
+ }
+
+ @Test
+ public void writeUTF8Strings() throws Exception {
+ ByteArrayAccessibleOutputStream interm = new ByteArrayAccessibleOutputStream();
+ DataOutput dout = new DataOutputStream(interm);
+ writeUTF8Test("abcdefABCDEF", dout, interm);
+ writeUTF8Test("šťžľčěďňřůĺ", dout, interm);
+ writeUTF8Test("Ă㪺Ţţ", dout, interm);
+ }
+
+ private void writeIntTest(int i) throws Exception {
+ baaos.reset();
+ WriteValueTools.writeInt(i, baaos);
+ byte[] goal = Integer.toString(i).getBytes();
+ if (baaos.size() != goal.length) {
+ throw new Exception("Expecting to write " + i + " in " + goal.length + " bytes, but found " + baaos.size()
+ + " bytes.");
+ }
+ for (int k = 0; k < goal.length; k++) {
+ if (goal[k] != baaos.getByteArray()[k]) {
+ throw new Exception("Expecting to write " + i + " as " + goal + ", but found " + baaos.getByteArray()
+ + " instead.");
+ }
+ }
+ }
+
+ private void writeLongTest(long x) throws Exception {
+ baaos.reset();
+ WriteValueTools.writeLong(x, baaos);
+ byte[] goal = Long.toString(x).getBytes();
+ if (baaos.size() != goal.length) {
+ throw new Exception("Expecting to write " + x + " in " + goal.length + " bytes, but found " + baaos.size()
+ + " bytes.");
+ }
+ for (int k = 0; k < goal.length; k++) {
+ if (goal[k] != baaos.getByteArray()[k]) {
+ throw new Exception("Expecting to write " + x + " as " + goal + ", but found " + baaos.getByteArray()
+ + " instead.");
+ }
+ }
+ }
+
+ private void writeUTF8Test(String str, DataOutput dout, ByteArrayAccessibleOutputStream interm) throws Exception {
+ interm.reset();
+ dout.writeUTF(str);
+ baaos.reset();
+ WriteValueTools.writeUTF8String(interm.getByteArray(), 0, interm.size(), baaos);
+ byte[] b = str.getBytes();
+ if (baaos.size() != b.length + 2) {
+ throw new Exception("Expecting to write " + b + " in " + b.length + " bytes, but found " + baaos.size()
+ + " bytes.");
+ }
+ if (baaos.getByteArray()[0] != '\"' || baaos.getByteArray()[baaos.size() - 1] != '\"') {
+ throw new Exception("Missing quotes.");
+ }
+ for (int k = 0; k < b.length; k++) {
+ if (b[k] != baaos.getByteArray()[k + 1]) {
+ throw new Exception("Expecting to write " + b + ", but found " + baaos.getByteArray() + " instead.");
+ }
+ }
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
new file mode 100644
index 0000000..d6c2165
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.util;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class AlgebricksHyracksIntegrationUtil {
+
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+
+ public static final int TEST_HYRACKS_CC_PORT = 4322;
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.port = TEST_HYRACKS_CC_PORT;
+ // ccConfig.useJOL = true;
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ hcc = new HyracksLocalConnection(cc);
+ hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
+ }
+
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+
+ public static void runJob(JobSpecification spec) throws Exception {
+ JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
+ cc.start(jobId);
+ AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
+ cc.waitForCompletion(jobId);
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out b/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out
new file mode 100644
index 0000000..1c0fd6a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out
@@ -0,0 +1,25 @@
+0; "ALGERIA"; 0; " haggle. carefully final deposits detect slyly agai"
+1; "ARGENTINA"; 1; "al foxes promise slyly according to the regular accounts. bold requests alon"
+2; "BRAZIL"; 1; "y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special "
+3; "CANADA"; 1; "eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold"
+18; "CHINA"; 2; "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"
+4; "EGYPT"; 4; "y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d"
+5; "ETHIOPIA"; 0; "ven packages wake quickly. regu"
+6; "FRANCE"; 3; "refully final requests. regular, ironi"
+7; "GERMANY"; 3; "l platelets. regular accounts x-ray: unusual, regular acco"
+8; "INDIA"; 2; "ss excuses cajole slyly across the packages. deposits print aroun"
+9; "INDONESIA"; 2; " slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull"
+10; "IRAN"; 4; "efully alongside of the slyly final dependencies. "
+11; "IRAQ"; 4; "nic deposits boost atop the quickly final requests? quickly regula"
+12; "JAPAN"; 2; "ously. final, express gifts cajole a"
+13; "JORDAN"; 4; "ic deposits are blithely about the carefully regular pa"
+14; "KENYA"; 0; " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"
+15; "MOROCCO"; 0; "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"
+16; "MOZAMBIQUE"; 0; "s. ironic, unusual asymptotes wake blithely r"
+17; "PERU"; 1; "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"
+19; "ROMANIA"; 3; "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"
+22; "RUSSIA"; 3; " requests against the platelets use never according to the quickly regular pint"
+20; "SAUDI ARABIA"; 4; "ts. silent requests haggle. closely express packages sleep across the blithely"
+23; "UNITED KINGDOM"; 3; "eans boost carefully special requests. accounts are. carefull"
+24; "UNITED STATES"; 1; "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be"
+21; "VIETNAM"; 2; "hely enticingly express accounts. even, final "