Initial Algebricks integration

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_algebricks_integration@854 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
new file mode 100644
index 0000000..ed98eec
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntArrayUnnester.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class IntArrayUnnester implements IUnnestingFunctionFactory {
+
+    private int[] x;
+
+    public IntArrayUnnester(int[] x) {
+        this.x = x;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException {
+
+        final DataOutput out = provider.getDataOutput();
+
+        return new IUnnestingFunction() {
+
+            private int pos;
+
+            @Override
+            public void init(IFrameTupleReference tuple) throws AlgebricksException {
+                pos = 0;
+            }
+
+            @Override
+            public boolean step() throws AlgebricksException {
+                try {
+                    if (pos < x.length) {
+                        // Writes one byte to distinguish between null
+                        // values and end of sequence.
+                        out.writeInt(x[pos]);
+                        ++pos;
+                        return true;
+                    } else {
+                        return false;
+                    }
+
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+        };
+
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
new file mode 100644
index 0000000..57c0789
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerAddEvalFactory implements IEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory evalLeftFactory;
+    private IEvaluatorFactory evalRightFactory;
+
+    public IntegerAddEvalFactory(IEvaluatorFactory evalLeftFactory, IEvaluatorFactory evalRightFactory) {
+        this.evalLeftFactory = evalLeftFactory;
+        this.evalRightFactory = evalRightFactory;
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+            private ArrayBackedValueStorage argOut = new ArrayBackedValueStorage();
+
+            private IEvaluator evalLeft = evalLeftFactory.createEvaluator(argOut);
+            private IEvaluator evalRight = evalRightFactory.createEvaluator(argOut);
+
+            @SuppressWarnings("static-access")
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                argOut.reset();
+                evalLeft.evaluate(tuple);
+                int v1 = IntegerSerializerDeserializer.INSTANCE.getInt(argOut.getBytes(), 0);
+                argOut.reset();
+                evalRight.evaluate(tuple);
+                int v2 = IntegerSerializerDeserializer.INSTANCE.getInt(argOut.getBytes(), 0);
+                try {
+                    out.writeInt(v1 + v2);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
new file mode 100644
index 0000000..880a4df
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerConstantEvalFactory implements IEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final int value;
+
+    public IntegerConstantEvalFactory(int value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "IntConstantEvalFactory " + value;
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+            private ArrayBackedValueStorage buf = new ArrayBackedValueStorage();
+            boolean first = true;
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                if (first) {
+                    first = false;
+                    try {
+                        IntegerSerializerDeserializer.INSTANCE.serialize(value, buf.getDataOutput());
+                    } catch (HyracksDataException e) {
+                        throw new AlgebricksException(e);
+                    }
+                }
+
+                try {
+                    out.write(buf.getBytes(), 0, buf.getLength());
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
new file mode 100644
index 0000000..e15851b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerEqualsEvalFactory implements IEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory evalFact1, evalFact2;
+
+    public IntegerEqualsEvalFactory(IEvaluatorFactory evalFact1, IEvaluatorFactory evalFact2) {
+        this.evalFact1 = evalFact1;
+        this.evalFact2 = evalFact2;
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+            private DataOutput dataout = output.getDataOutput();
+            private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
+            private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
+            private IEvaluator eval1 = evalFact1.createEvaluator(out1);
+            private IEvaluator eval2 = evalFact2.createEvaluator(out2);
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                out1.reset();
+                eval1.evaluate(tuple);
+                out2.reset();
+                eval2.evaluate(tuple);
+                int v1 = IntegerSerializerDeserializer.getInt(out1.getBytes(), 0);
+                int v2 = IntegerSerializerDeserializer.getInt(out2.getBytes(), 0);
+                boolean r = v1 == v2;
+                try {
+                    dataout.writeBoolean(r);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
new file mode 100644
index 0000000..5c5d6d4
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class IntegerGreaterThanEvalFactory implements IEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory evalFact1, evalFact2;
+
+    public IntegerGreaterThanEvalFactory(IEvaluatorFactory evalFact1, IEvaluatorFactory evalFact2) {
+        this.evalFact1 = evalFact1;
+        this.evalFact2 = evalFact2;
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+            private DataOutput dataout = output.getDataOutput();
+            private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage();
+            private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage();
+            private IEvaluator eval1 = evalFact1.createEvaluator(out1);
+            private IEvaluator eval2 = evalFact2.createEvaluator(out2);
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                out1.reset();
+                eval1.evaluate(tuple);
+                out2.reset();
+                eval2.evaluate(tuple);
+                int v1 = IntegerSerializerDeserializer.getInt(out1.getBytes(), 0);
+                int v2 = IntegerSerializerDeserializer.getInt(out2.getBytes(), 0);
+                boolean r = v1 > v2;
+                try {
+                    dataout.writeBoolean(r);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java
new file mode 100644
index 0000000..c914c7c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/java/edu/uci/ics/hyracks/algebricks/tests/script/IdentityStreamingScript.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.script;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class IdentityStreamingScript {
+
+    public static void main(String args[]) throws IOException {
+        BufferedReader r = new BufferedReader(new InputStreamReader(System.in));
+        String s;
+        while ((s = r.readLine()) != null) {
+            System.out.println(s);
+        }
+        r.close();
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd
new file mode 100644
index 0000000..b8eb4a0
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.cmd
@@ -0,0 +1,63 @@
+@ECHO OFF
+SETLOCAL
+
+:: Licensed to the Apache Software Foundation (ASF) under one or more
+:: contributor license agreements.  See the NOTICE file distributed with
+:: this work for additional information regarding copyright ownership.
+:: The ASF licenses this file to You under the Apache License, Version 2.0
+:: (the "License"); you may not use this file except in compliance with
+:: the License.  You may obtain a copy of the License at
+::
+::     http://www.apache.org/licenses/LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing, software
+:: distributed under the License is distributed on an "AS IS" BASIS,
+:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+:: See the License for the specific language governing permissions and
+:: limitations under the License.
+
+:: JAVA classpath
+:: Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+:: the classpath. Separate multiple paths with ":". Enclose the value
+:: in double quotes. Adding additional files or locations on separate
+:: lines makes things clearer.
+:: Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+:: Example:
+::
+::     Set the CLASSPATH to a jar file and a directory.  Note that
+::     "classes dir" is a directory of class files with a space in the name.
+::
+:: CLASSPATH="usr/local/Product1/lib/product.jar"
+:: CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+::
+SET CLASSPATH="@classpath@"
+
+:: JVM parameters
+:: If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+:: for the Java virtual machine set the local variable JVM_PARAMETERS below
+:: Example:
+:: JVM_PARAMETERS=-Xms100M -Xmx200M
+::
+:: Below are the JVM parameters needed to do remote debugging using Intellij
+:: IDEA.  Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+:: IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+::
+:: JVM_PARAMETERS=
+
+:: ---------------------------------------------------------------------------
+:: Default configuration. Do not modify below this line.
+:: ---------------------------------------------------------------------------
+:: Application specific parameters
+
+SET MAIN_CLASS=@main.class@
+SET JVM_PARAMS=@jvm.params@
+SET PROGRAM_PARAMS=@program.params@
+
+:: Try to find java virtual machine
+IF NOT DEFINED JAVA (
+  IF NOT DEFINED JAVA_HOME SET JAVA="java.exe"
+  IF DEFINED JAVA_HOME SET JAVA="%JAVA_HOME%\bin\java.exe"
+)
+
+:: Run program
+%JAVA% %JVM_PARAMS% %JVM_PARAMETERS% -classpath %CLASSPATH% %MAIN_CLASS% %PROGRAM_PARAMS% %*
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh
new file mode 100644
index 0000000..a998626
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/main/scripts/run.sh
@@ -0,0 +1,81 @@
+#!/bin/sh
+# JAVA classpath
+# Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+# the classpath. Separate multiple paths with ":". Enclose the value
+# in double quotes. Adding additional files or locations on separate
+# lines makes things clearer.
+# Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+# Example:
+#
+#     Set the CLASSPATH to a jar file and a directory.  Note that
+#     "classes dir" is a directory of class files with a space in the name.
+#
+# CLASSPATH="usr/local/Product1/lib/product.jar"
+# CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+#
+CLASSPATH="@classpath@"
+
+# JVM parameters
+# If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+# for the Java virtual machine set the local variable JVM_PARAMETERS below
+# Example:
+# JVM_PARAMETERS=-Xms100M -Xmx200M
+#
+# Below are the JVM parameters needed to do remote debugging using Intellij
+# IDEA.  Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+# IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+#
+# JVM_PARAMETERS=
+
+#run with shared memory setup
+#if [ -n "${RUN_SHARED_MEM}"]; then
+#  JVM_PARAMETERS="${JVM_PARAMETERS} -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=n,address=javadebug,suspend=y"
+#fi
+
+# ---------------------------------------------------------------------------
+# Default configuration. Do not modify below this line.
+# ---------------------------------------------------------------------------
+# Application specific parameters
+
+MAIN_CLASS="@main.class@"
+JVM_PARAMS="@jvm.params@"
+PROGRAM_PARAMS="@program.params@"
+
+# Cygwin support.  $cygwin _must_ be set to either true or false.
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  *) cygwin=false ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# Try to find java virtual machine
+if [ -z "${JAVA}" ];  then
+  if [ -z "${JAVA_HOME}" ]; then
+    JAVA=java
+  else
+    JAVA=${JAVA_HOME}/bin/java
+  fi
+fi
+
+# Try to find directory where this script is located
+COMMAND="${PWD}/$0"
+if [ ! -f "${COMMAND}" ]; then
+	COMMAND="$0"
+fi
+BASEDIR=`expr "${COMMAND}" : '\(.*\)/\.*'`
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+#  JAVA=`cygpath --path --windows "$JAVA"`
+  CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+fi
+
+# Run program
+${JAVA} ${JVM_PARAMS} ${JVM_PARAMETERS} -classpath "${CLASSPATH}" ${MAIN_CLASS} ${PROGRAM_PARAMS} $*
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
new file mode 100644
index 0000000..6bd055d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -0,0 +1,893 @@
+package edu.uci.ics.hyracks.algebricks.tests.pushruntime;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.aggregators.TupleCountAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.aggregators.TupleCountRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.evaluators.ColumnAccessEvalFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.BinaryBooleanInspectorImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.BinaryIntegerInspectorImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.IntegerPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.NoopNullWriterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.data.UTF8StringPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.AggregateRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.aggreg.SimpleAlgebricksAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.SubplanRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.sort.InMemorySortRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.NestedTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.PrinterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.RunningAggregateRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamLimitRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamProjectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamSelectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StringStreamingRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.UnnestRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.algebricks.tests.util.AlgebricksHyracksIntegrationUtil;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+
+public class PushRuntimeTest {
+
+    private static final String SEPARATOR = System.getProperty("file.separator");
+    private static final String PATH_ACTUAL = "rttest";
+    private static final String PATH_BASE = "src" + SEPARATOR + "test" + SEPARATOR + "resources";
+    private static final String PATH_EXPECTED = PATH_BASE + SEPARATOR + "results";
+
+    private static final String[] DEFAULT_NODES = new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID };
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+        AlgebricksHyracksIntegrationUtil.init();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        AlgebricksHyracksIntegrationUtil.deinit();
+        File outdir = new File(PATH_ACTUAL);
+        File[] files = outdir.listFiles();
+        if (files == null || files.length == 0) {
+            outdir.delete();
+        }
+    }
+
+    @Test
+    public void etsAssignPrint() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+        IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+                const2 }, new int[] { 0, 1 });
+        RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        PrinterRuntimeFactory printer = new PrinterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, assignDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, assign, printer },
+                new RecordDescriptor[] { etsDesc, assignDesc, null });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+    }
+
+    @Test
+    public void etsAssignWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+        IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+                const2 }, new int[] { 0, 1 });
+        RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+                PrinterBasedWriterFactory.INSTANCE, assignDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, assign, writer }, new RecordDescriptor[] { etsDesc, assignDesc, null });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("400; 3", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanSelectWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] intFileSplits = new FileSplit[1];
+        intFileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/simple/int-part1.tbl")));
+        IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(intFileSplits);
+        RecordDescriptor intScannerDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE };
+        FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), intScannerDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+        // the algebricks op.
+        IEvaluatorFactory cond = new IntegerGreaterThanEvalFactory(new IntegerConstantEvalFactory(2),
+                new ColumnAccessEvalFactory(0));
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 0 },
+                BinaryBooleanInspectorImpl.INSTANCE);
+        RecordDescriptor selectDesc = intScannerDesc;
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanSelectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                selectDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, algebricksOp, 0);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("0", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void etsAssignProjectWrite() throws Exception {
+
+        JobSpecification spec = new JobSpecification();
+        IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+        IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+                const2 }, new int[] { 0, 1 });
+        RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        StreamProjectRuntimeFactory project = new StreamProjectRuntimeFactory(new int[] { 1 });
+        RecordDescriptor projectDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignProjectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                projectDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, assign, project, writer }, new RecordDescriptor[] { etsDesc,
+                        assignDesc, projectDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("3", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanLimitWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner, DEFAULT_NODES);
+
+        // the algebricks op.
+        StreamLimitRuntimeFactory limit = new StreamLimitRuntimeFactory(new IntegerConstantEvalFactory(2), null,
+                new int[] { 0 }, BinaryIntegerInspectorImpl.INSTANCE);
+        RecordDescriptor limitDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanLimitWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                limitDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { limit, writer }, new RecordDescriptor[] { limitDesc, null });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("12", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void etsUnnestWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        IUnnestingFunctionFactory aggregFactory = new IntArrayUnnester(new int[] { 100, 200, 300 });
+        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[] { 0 });
+        RecordDescriptor unnestDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsUnnestWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                unnestDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, unnest, writer }, new RecordDescriptor[] { etsDesc, unnestDesc, null });
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("100200300", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanAggregateWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer-part1.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the algebricks op.
+        AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+                new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+        RecordDescriptor aggDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanAggregateWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                aggDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { agg, writer }, new RecordDescriptor[] { aggDesc, null });
+
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("75", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanSortGbySelectWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the sort (by nation id)
+        RecordDescriptor sortDesc = scannerDesc;
+        InMemorySortOperatorDescriptor sort = new InMemorySortOperatorDescriptor(spec, new int[] { 3 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, sortDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sort,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the group-by
+        NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+        RecordDescriptor ntsDesc = sortDesc;
+        AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+                new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+        RecordDescriptor aggDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg },
+                new RecordDescriptor[] { ntsDesc, aggDesc });
+        NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory(
+                new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {});
+        RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor gby = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 3 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the algebricks op.
+        IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+                new ColumnAccessEvalFactory(0)); // Canadian customers
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+                BinaryBooleanInspectorImpl.INSTANCE);
+        RecordDescriptor selectDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanSortGbySelectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                selectDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sort, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, gby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), gby, 0, algebricksOp, 0);
+        spec.addRoot(algebricksOp);
+
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("9", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanHashGbySelectWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the group-by
+        RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(new int[] { 3 },
+                new IBinaryHashFunctionFactory[] { IntegerBinaryHashFunctionFactory.INSTANCE });
+        IAggregateFunctionFactory[] aggFuns = new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() };
+        IAccumulatingAggregatorFactory aggFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFuns,
+                new int[] { 3 }, new int[] {});
+        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 3 }, tpcf,
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, aggFactory, gbyDesc, 1024);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the algebricks op.
+        IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+                new ColumnAccessEvalFactory(0)); // Canadian customers
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+                BinaryBooleanInspectorImpl.INSTANCE);
+        RecordDescriptor selectDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanHashGbySelectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                selectDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { select, writer }, new RecordDescriptor[] { selectDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, gby, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), gby, 0, algebricksOp, 0);
+        spec.addRoot(algebricksOp);
+
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("9", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void etsUnnestRunningaggregateWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        IUnnestingFunctionFactory aggregFactory = new IntArrayUnnester(new int[] { 100, 200, 300 });
+        UnnestRuntimeFactory unnest = new UnnestRuntimeFactory(0, aggregFactory, new int[] { 0 });
+        RecordDescriptor unnestDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        RunningAggregateRuntimeFactory ragg = new RunningAggregateRuntimeFactory(new int[] { 1 },
+                new IRunningAggregateFunctionFactory[] { new TupleCountRunningAggregateFunctionFactory() }, new int[] {
+                        0, 1 });
+        RecordDescriptor raggDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsUnnestRunningaggregateWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 1 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                raggDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, unnest, ragg, writer }, new RecordDescriptor[] { etsDesc, unnestDesc,
+                        raggDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("123", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void etsAssignScriptWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+        IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+        AssignRuntimeFactory assign = new AssignRuntimeFactory(new int[] { 0, 1 }, new IEvaluatorFactory[] { const1,
+                const2 }, new int[] { 0, 1 });
+        RecordDescriptor assignDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        IValueParserFactory[] valueParsers = { IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE };
+
+        String osname = System.getProperty("os.name");
+        String command;
+        if (osname.equals("Linux")) {
+            command = "bash target/testscripts/idscript";
+        } else if (osname.startsWith("Windows")) {
+            command = "target\\testscripts\\idscript.cmd";
+        } else {
+            // don't know how to test
+            return;
+        }
+
+        StringStreamingRuntimeFactory script = new StringStreamingRuntimeFactory(command, new IPrinterFactory[] {
+                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, ' ',
+                new DelimitedDataTupleParserFactory(valueParsers, ' '));
+        RecordDescriptor scriptDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignScriptWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1 }, new IPrinterFactory[] {
+                IntegerPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE }, outFile,
+                PrinterBasedWriterFactory.INSTANCE, scriptDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, assign, script, writer }, new RecordDescriptor[] { etsDesc,
+                        assignDesc, scriptDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("400; 3", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanSplitWrite() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification();
+
+        String inputFileName = "data/tpch0.001/customer.tbl";
+        File inputFile = new File(inputFileName);
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("splitop", null);
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID,
+                new FileReference(inputFile)) };
+
+        DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+        RecordDescriptor stringRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                inputSplits), stringParser, stringRec);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(
+                    AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+                    new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void scanMicroSortWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/nation.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the algebricks op.
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, null,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, null);
+        RecordDescriptor sortDesc = scannerDesc;
+
+        String fileName = "scanMicroSortWrite.out";
+        String filePath = PATH_ACTUAL + SEPARATOR + fileName;
+        String resultFilePath = PATH_EXPECTED + SEPARATOR + fileName;
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0, 1, 2, 3 }, new IPrinterFactory[] {
+                IntegerPrinterFactory.INSTANCE, UTF8StringPrinterFactory.INSTANCE, IntegerPrinterFactory.INSTANCE,
+                UTF8StringPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE, sortDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { sort, writer }, new RecordDescriptor[] { sortDesc, null });
+
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, algebricksOp, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        compareFiles(filePath, resultFilePath);
+        outFile.delete();
+    }
+
+    @Test
+    public void etsAssignSubplanProjectWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IntegerConstantEvalFactory const1 = new IntegerConstantEvalFactory(400);
+        IntegerConstantEvalFactory const2 = new IntegerConstantEvalFactory(3);
+
+        EmptyTupleSourceRuntimeFactory ets = new EmptyTupleSourceRuntimeFactory();
+        RecordDescriptor etsDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
+
+        AssignRuntimeFactory assign1 = new AssignRuntimeFactory(new int[] { 0 }, new IEvaluatorFactory[] { const1 },
+                new int[] { 0 });
+        RecordDescriptor assign1Desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+
+        AssignRuntimeFactory assign2 = new AssignRuntimeFactory(new int[] { 1 },
+                new IEvaluatorFactory[] { new IntegerAddEvalFactory(new ColumnAccessEvalFactory(0), const2) },
+                new int[] { 0, 1 });
+        RecordDescriptor assign2Desc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        StreamProjectRuntimeFactory project1 = new StreamProjectRuntimeFactory(new int[] { 1 });
+        RecordDescriptor project1Desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, assign2, project1 },
+                new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc });
+
+        SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline,
+                new INullWriterFactory[] { NoopNullWriterFactory.INSTANCE }, assign1Desc, null);
+
+        RecordDescriptor subplanDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        StreamProjectRuntimeFactory project2 = new StreamProjectRuntimeFactory(new int[] { 1 });
+        RecordDescriptor project2Desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "etsAssignSubplanProjectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                project2Desc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 0,
+                new IPushRuntimeFactory[] { ets, assign1, subplan, project2, writer }, new RecordDescriptor[] {
+                        etsDesc, assign1Desc, subplanDesc, project2Desc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp, DEFAULT_NODES);
+
+        spec.addRoot(algebricksOp);
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("403", buf.toString());
+        outFile.delete();
+    }
+
+    @Test
+    public void scanMicroSortGbySelectWrite() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        // the scanner
+        FileSplit[] fileSplits = new FileSplit[1];
+        fileSplits[0] = new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl")));
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(fileSplits);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+        FileScanOperatorDescriptor scanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanner,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        // the sort (by nation id)
+        RecordDescriptor sortDesc = scannerDesc;
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, null,
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, null);
+
+        // the group-by
+        NestedTupleSourceRuntimeFactory nts = new NestedTupleSourceRuntimeFactory();
+        RecordDescriptor ntsDesc = sortDesc;
+        AggregateRuntimeFactory agg = new AggregateRuntimeFactory(
+                new IAggregateFunctionFactory[] { new TupleCountAggregateFunctionFactory() });
+        RecordDescriptor aggDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+        AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg },
+                new RecordDescriptor[] { ntsDesc, aggDesc });
+        NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory(
+                new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {});
+        RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        MicroPreClusteredGroupRuntimeFactory gby = new MicroPreClusteredGroupRuntimeFactory(new int[] { 3 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, sortDesc, gbyDesc,
+                null);
+
+        // the algebricks op.
+        IEvaluatorFactory cond = new IntegerEqualsEvalFactory(new IntegerConstantEvalFactory(3),
+                new ColumnAccessEvalFactory(0)); // Canadian customers
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(cond, new int[] { 1 },
+                BinaryBooleanInspectorImpl.INSTANCE);
+        RecordDescriptor selectDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+
+        String filePath = PATH_ACTUAL + SEPARATOR + "scanSortGbySelectWrite.out";
+        File outFile = new File(filePath);
+        SinkWriterRuntimeFactory writer = new SinkWriterRuntimeFactory(new int[] { 0 },
+                new IPrinterFactory[] { IntegerPrinterFactory.INSTANCE }, outFile, PrinterBasedWriterFactory.INSTANCE,
+                selectDesc);
+
+        AlgebricksMetaOperatorDescriptor algebricksOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] { sort, gby, select, writer }, new RecordDescriptor[] { sortDesc, gbyDesc,
+                        selectDesc, null });
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, algebricksOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, algebricksOp, 0);
+        spec.addRoot(algebricksOp);
+
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+        StringBuilder buf = new StringBuilder();
+        readFileToString(outFile, buf);
+        Assert.assertEquals("9", buf.toString());
+        outFile.delete();
+    }
+
+    private static void readFileToString(File file, StringBuilder buf) throws Exception {
+        BufferedReader result = new BufferedReader(new FileReader(file));
+        boolean first = true;
+        while (true) {
+            String s = result.readLine();
+            if (s == null) {
+                break;
+            } else {
+                if (!first) {
+                    first = false;
+                    buf.append('\n');
+                }
+                buf.append(s);
+            }
+        }
+        result.close();
+    }
+
+    public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+        String lineA, lineB;
+        while ((lineA = fileA.readLine()) != null) {
+            lineB = fileB.readLine();
+            Assert.assertEquals(lineA, lineB);
+        }
+        Assert.assertNull(fileB.readLine());
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
new file mode 100644
index 0000000..73dc16d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
@@ -0,0 +1,97 @@
+package edu.uci.ics.hyracks.algebricks.tests.tools;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.algebricks.core.utils.WriteValueTools;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+
+public class WriteValueTest {
+
+    private ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+
+    @Test
+    public void writeIntegers() throws Exception {
+        writeIntTest(6);
+        writeIntTest(1234);
+        writeIntTest(-1234);
+        writeIntTest(Integer.MAX_VALUE);
+        writeIntTest(Integer.MAX_VALUE - 1);
+        writeIntTest(Integer.MIN_VALUE);
+        writeIntTest(Integer.MIN_VALUE + 1);
+    }
+
+    @Test
+    public void writeLongs() throws Exception {
+        writeLongTest(Integer.MAX_VALUE);
+        writeLongTest(Integer.MAX_VALUE - 1);
+        writeLongTest(Integer.MIN_VALUE);
+        writeLongTest(Integer.MIN_VALUE + 1);
+        writeLongTest(0L);
+        writeLongTest(1234567890L);
+        writeLongTest(-1234567890L);
+    }
+
+    @Test
+    public void writeUTF8Strings() throws Exception {
+        ByteArrayAccessibleOutputStream interm = new ByteArrayAccessibleOutputStream();
+        DataOutput dout = new DataOutputStream(interm);
+        writeUTF8Test("abcdefABCDEF", dout, interm);
+        writeUTF8Test("šťžľčěďňřůĺ", dout, interm);
+        writeUTF8Test("Ă㪺Ţţ", dout, interm);
+    }
+
+    private void writeIntTest(int i) throws Exception {
+        baaos.reset();
+        WriteValueTools.writeInt(i, baaos);
+        byte[] goal = Integer.toString(i).getBytes();
+        if (baaos.size() != goal.length) {
+            throw new Exception("Expecting to write " + i + " in " + goal.length + " bytes, but found " + baaos.size()
+                    + " bytes.");
+        }
+        for (int k = 0; k < goal.length; k++) {
+            if (goal[k] != baaos.getByteArray()[k]) {
+                throw new Exception("Expecting to write " + i + " as " + goal + ", but found " + baaos.getByteArray()
+                        + " instead.");
+            }
+        }
+    }
+
+    private void writeLongTest(long x) throws Exception {
+        baaos.reset();
+        WriteValueTools.writeLong(x, baaos);
+        byte[] goal = Long.toString(x).getBytes();
+        if (baaos.size() != goal.length) {
+            throw new Exception("Expecting to write " + x + " in " + goal.length + " bytes, but found " + baaos.size()
+                    + " bytes.");
+        }
+        for (int k = 0; k < goal.length; k++) {
+            if (goal[k] != baaos.getByteArray()[k]) {
+                throw new Exception("Expecting to write " + x + " as " + goal + ", but found " + baaos.getByteArray()
+                        + " instead.");
+            }
+        }
+    }
+
+    private void writeUTF8Test(String str, DataOutput dout, ByteArrayAccessibleOutputStream interm) throws Exception {
+        interm.reset();
+        dout.writeUTF(str);
+        baaos.reset();
+        WriteValueTools.writeUTF8String(interm.getByteArray(), 0, interm.size(), baaos);
+        byte[] b = str.getBytes();
+        if (baaos.size() != b.length + 2) {
+            throw new Exception("Expecting to write " + b + " in " + b.length + " bytes, but found " + baaos.size()
+                    + " bytes.");
+        }
+        if (baaos.getByteArray()[0] != '\"' || baaos.getByteArray()[baaos.size() - 1] != '\"') {
+            throw new Exception("Missing quotes.");
+        }
+        for (int k = 0; k < b.length; k++) {
+            if (b[k] != baaos.getByteArray()[k + 1]) {
+                throw new Exception("Expecting to write " + b + ", but found " + baaos.getByteArray() + " instead.");
+            }
+        }
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
new file mode 100644
index 0000000..d6c2165
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.tests.util;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class AlgebricksHyracksIntegrationUtil {
+
+    public static final String NC1_ID = "nc1";
+    public static final String NC2_ID = "nc2";
+
+    public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+
+    public static final int TEST_HYRACKS_CC_PORT = 4322;
+
+    private static ClusterControllerService cc;
+    private static NodeControllerService nc1;
+    private static NodeControllerService nc2;
+    private static IHyracksClientConnection hcc;
+
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.port = TEST_HYRACKS_CC_PORT;
+        // ccConfig.useJOL = true;
+        cc = new ClusterControllerService(ccConfig);
+        cc.start();
+
+        NCConfig ncConfig1 = new NCConfig();
+        ncConfig1.ccHost = "localhost";
+        ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig1.dataIPAddress = "127.0.0.1";
+        ncConfig1.nodeId = NC1_ID;
+        nc1 = new NodeControllerService(ncConfig1);
+        nc1.start();
+
+        NCConfig ncConfig2 = new NCConfig();
+        ncConfig2.ccHost = "localhost";
+        ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+        ncConfig2.dataIPAddress = "127.0.0.1";
+        ncConfig2.nodeId = NC2_ID;
+        nc2 = new NodeControllerService(ncConfig2);
+        nc2.start();
+
+        hcc = new HyracksLocalConnection(cc);
+        hcc.createApplication(AlgebricksConfig.HYRACKS_APP_NAME, null);
+    }
+
+    public static void deinit() throws Exception {
+        nc2.stop();
+        nc1.stop();
+        cc.stop();
+    }
+
+    public static void runJob(JobSpecification spec) throws Exception {
+        JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
+        cc.start(jobId);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
+        cc.waitForCompletion(jobId);
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out b/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out
new file mode 100644
index 0000000..1c0fd6a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/resources/results/scanMicroSortWrite.out
@@ -0,0 +1,25 @@
+0; "ALGERIA"; 0; " haggle. carefully final deposits detect slyly agai"
+1; "ARGENTINA"; 1; "al foxes promise slyly according to the regular accounts. bold requests alon"
+2; "BRAZIL"; 1; "y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special "
+3; "CANADA"; 1; "eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold"
+18; "CHINA"; 2; "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"
+4; "EGYPT"; 4; "y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d"
+5; "ETHIOPIA"; 0; "ven packages wake quickly. regu"
+6; "FRANCE"; 3; "refully final requests. regular, ironi"
+7; "GERMANY"; 3; "l platelets. regular accounts x-ray: unusual, regular acco"
+8; "INDIA"; 2; "ss excuses cajole slyly across the packages. deposits print aroun"
+9; "INDONESIA"; 2; " slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull"
+10; "IRAN"; 4; "efully alongside of the slyly final dependencies. "
+11; "IRAQ"; 4; "nic deposits boost atop the quickly final requests? quickly regula"
+12; "JAPAN"; 2; "ously. final, express gifts cajole a"
+13; "JORDAN"; 4; "ic deposits are blithely about the carefully regular pa"
+14; "KENYA"; 0; " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"
+15; "MOROCCO"; 0; "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"
+16; "MOZAMBIQUE"; 0; "s. ironic, unusual asymptotes wake blithely r"
+17; "PERU"; 1; "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"
+19; "ROMANIA"; 3; "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"
+22; "RUSSIA"; 3; " requests against the platelets use never according to the quickly regular pint"
+20; "SAUDI ARABIA"; 4; "ts. silent requests haggle. closely express packages sleep across the blithely"
+23; "UNITED KINGDOM"; 3; "eans boost carefully special requests. accounts are. carefull"
+24; "UNITED STATES"; 1; "y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be"
+21; "VIETNAM"; 2; "hely enticingly express accounts. even, final "