Hoisted algebricks as a top-level project in fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1968 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-runtime/pom.xml b/algebricks/algebricks-runtime/pom.xml
new file mode 100644
index 0000000..ee917c2
--- /dev/null
+++ b/algebricks/algebricks-runtime/pom.xml
@@ -0,0 +1,51 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>algebricks-runtime</artifactId>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>algebricks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-storage-am-btree</artifactId>
+  	<version>0.2.2-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-storage-am-rtree</artifactId>
+  	<version>0.2.2-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-dataflow-std</artifactId>
+  	<version>0.2.2-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>algebricks-common</artifactId>
+  	<version>0.2.2-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>algebricks-data</artifactId>
+  	<version>0.2.2-SNAPSHOT</version>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
new file mode 100644
index 0000000..0658d62
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountAggregateFunctionFactory implements IAggregateEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+        final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+        return new IAggregateEvaluator() {
+
+            int cnt;
+
+            @Override
+            public void step(IFrameTupleReference tuple) throws AlgebricksException {
+                ++cnt;
+            }
+
+            @Override
+            public void init() throws AlgebricksException {
+                cnt = 0;
+            }
+
+            @Override
+            public void finish(IPointable result) throws AlgebricksException {
+                try {
+                    abvs.reset();
+                    abvs.getDataOutput().writeInt(cnt);
+                    result.set(abvs);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..fa73f6c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountRunningAggregateFunctionFactory implements IRunningAggregateEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
+        final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+        return new IRunningAggregateEvaluator() {
+
+            int cnt;
+
+            @Override
+            public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                ++cnt;
+                try {
+                    abvs.reset();
+                    abvs.getDataOutput().writeInt(cnt);
+                    result.set(abvs);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void init() throws AlgebricksException {
+                cnt = 0;
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
new file mode 100644
index 0000000..50f014b
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class AlgebricksPipeline implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final IPushRuntimeFactory[] runtimeFactories;
+    private final RecordDescriptor[] recordDescriptors;
+
+    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) {
+        this.runtimeFactories = runtimeFactories;
+        this.recordDescriptors = recordDescriptors;
+        // this.projectedColumns = projectedColumns;
+    }
+
+    public IPushRuntimeFactory[] getRuntimeFactories() {
+        return runtimeFactories;
+    }
+
+    public RecordDescriptor[] getRecordDescriptors() {
+        return recordDescriptors;
+    }
+
+    public int getOutputWidth() {
+        return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
+    }
+
+    // public int[] getProjectedColumns() {
+    // return projectedColumns;
+    // }
+
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
new file mode 100644
index 0000000..9b014ee
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateEvaluator {
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public void finish(IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..83bfbf7
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IAggregateEvaluatorFactory extends Serializable {
+    public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
new file mode 100644
index 0000000..c768674
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyAggregateFunction {
+    /** should be called each time a new aggregate value is computed */
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public void finish() throws AlgebricksException;
+
+    public void finishPartial() throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
new file mode 100644
index 0000000..702f392
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyAggregateFunctionFactory extends Serializable {
+    public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java
new file mode 100644
index 0000000..58d7569
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyEvaluator {
+    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
new file mode 100644
index 0000000..649902c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyEvaluatorFactory extends Serializable {
+    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
new file mode 100644
index 0000000..3b14b28
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyRunningAggregateFunction {
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..1065020
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyRunningAggregateFunctionFactory extends Serializable {
+    public ICopyRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+            throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
new file mode 100644
index 0000000..47aec7d
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopySerializableAggregateFunction {
+    /**
+     * initialize the space occupied by internal state
+     * 
+     * @param state
+     * @throws AlgebricksException
+     * @return length of the intermediate state
+     */
+    public void init(DataOutput state) throws AlgebricksException;
+
+    /**
+     * update the internal state
+     * 
+     * @param tuple
+     * @param state
+     * @throws AlgebricksException
+     */
+    public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException;
+
+    /**
+     * output the state to result
+     * 
+     * @param state
+     * @param result
+     * @throws AlgebricksException
+     */
+    public void finish(byte[] data, int start, int len, DataOutput result) throws AlgebricksException;
+
+    /**
+     * output the partial state to partial result
+     * 
+     * @param state
+     * @param partialResult
+     * @throws AlgebricksException
+     */
+    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
new file mode 100644
index 0000000..69d56a8
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ICopySerializableAggregateFunctionFactory extends Serializable {
+    public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
new file mode 100644
index 0000000..3443403
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyUnnestingFunction {
+    public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public boolean step() throws AlgebricksException;
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
new file mode 100644
index 0000000..e43da8b
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyUnnestingFunctionFactory extends Serializable {
+    public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
new file mode 100644
index 0000000..26fcc67
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IPushRuntime extends IFrameWriter {
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
new file mode 100644
index 0000000..f1d01e6
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IPushRuntimeFactory extends Serializable {
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java
new file mode 100644
index 0000000..f76a33c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IRunningAggregateEvaluator {
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException;
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..80f6511
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface IRunningAggregateEvaluatorFactory extends Serializable {
+    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java
new file mode 100644
index 0000000..e22097a
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IScalarEvaluator {
+    public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
new file mode 100644
index 0000000..e40b96c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IScalarEvaluatorFactory extends Serializable {
+    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
new file mode 100644
index 0000000..f65192a
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IUnnestingEvaluator {
+    public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public boolean step(IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
new file mode 100644
index 0000000..8bca2b9
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IUnnestingEvaluatorFactory extends Serializable {
+    public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
new file mode 100644
index 0000000..8ab92ee
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class AsterixBTreeRegistry {
+
+    private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+    private Lock registryLock = new ReentrantLock();
+
+    public BTree get(int fileId) {
+        return map.get(fileId);
+    }
+
+    // TODO: not very high concurrency, but good enough for now
+    public void lock() {
+        registryLock.lock();
+    }
+
+    public void unlock() {
+        registryLock.unlock();
+    }
+
+    public void register(int fileId, BTree btree) {
+        map.put(fileId, btree);
+    }
+
+    public void unregister(int fileId) {
+        map.remove(fileId);
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
new file mode 100644
index 0000000..5f5d19c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int fieldIndex;
+
+    public ColumnAccessEvalFactory(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "ColumnAccess(" + fieldIndex + ")";
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                byte[] buffer = tuple.getFieldData(fieldIndex);
+                int start = tuple.getFieldStart(fieldIndex);
+                int length = tuple.getFieldLength(fieldIndex);
+                try {
+                    out.write(buffer, start, length);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
new file mode 100644
index 0000000..c537c93
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvalFactory implements ICopyEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] value;
+
+    public ConstantEvalFactory(byte[] value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "Constant";
+    }
+
+    @Override
+    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new ICopyEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                try {
+                    out.write(value, 0, value.length);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
new file mode 100644
index 0000000..4f7c35a
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] value;
+
+    public ConstantEvaluatorFactory(byte[] value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "Constant";
+    }
+
+    @Override
+    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new IScalarEvaluator() {
+            @Override
+            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                result.set(value, 0, value.length);
+            }
+        };
+    }
+
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
new file mode 100644
index 0000000..29328ff
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleFieldEvaluatorFactory implements IScalarEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final int fieldIndex;
+
+    public TupleFieldEvaluatorFactory(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+
+    @Override
+    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new IScalarEvaluator() {
+            @Override
+            public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                result.set(tuple.getFieldData(fieldIndex), tuple.getFieldStart(fieldIndex),
+                        tuple.getFieldLength(fieldIndex));
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
new file mode 100644
index 0000000..a975195
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    // private int[] outColumns;
+    private IAggregateEvaluatorFactory[] aggregFactories;
+
+    public AggregateRuntimeFactory(IAggregateEvaluatorFactory[] aggregFactories) {
+        super(null);
+        // this.outColumns = outColumns;
+        this.aggregFactories = aggregFactories;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("assign [");
+        for (int i = 0; i < aggregFactories.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(aggregFactories[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private IAggregateEvaluator[] aggregs = new IAggregateEvaluator[aggregFactories.length];
+            private IPointable result = VoidPointable.FACTORY.createPointable();
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
+
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                try {
+                    if (first) {
+                        first = false;
+                        initAccessAppendRef(ctx);
+                        for (int i = 0; i < aggregFactories.length; i++) {
+                            aggregs[i] = aggregFactories[i].createAggregateEvaluator(ctx);
+                        }
+                    }
+                    for (int i = 0; i < aggregFactories.length; i++) {
+                        aggregs[i].init();
+                    }
+                } catch (AlgebricksException e) {
+                    throw new HyracksDataException(e);
+                }
+
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    processTuple(tRef);
+                }
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                computeAggregate();
+                appendToFrameFromTupleBuilder(tupleBuilder);
+                super.close();
+            }
+
+            private void computeAggregate() throws HyracksDataException {
+                tupleBuilder.reset();
+                for (int f = 0; f < aggregs.length; f++) {
+                    try {
+                        aggregs[f].finish(result);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+                }
+            }
+
+            private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
+                for (int f = 0; f < aggregs.length; f++) {
+                    try {
+                        aggregs[f].step(tupleRef);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..2ae6f72
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AlgebricksPipeline[] subplans;
+    private int[] keyFieldIdx;
+    private int[] decorFieldIdx;
+
+    public NestedPlansAccumulatingAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx,
+            int[] decorFieldIdx) {
+        this.subplans = subplans;
+        this.keyFieldIdx = keyFieldIdx;
+        this.decorFieldIdx = decorFieldIdx;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+
+        final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+                decorFieldIdx.length);
+        final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            try {
+                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                tb.reset();
+                for (int i = 0; i < keyFieldIdx.length; ++i) {
+                    tb.addField(accessor, tIndex, keyFieldIdx[i]);
+                }
+                for (int i = 0; i < decorFieldIdx.length; ++i) {
+                    tb.addField(accessor, tIndex, decorFieldIdx[i]);
+                }
+                for (int i = 0; i < pipelines.length; ++i) {
+                    pipelines[i].open();
+                }
+
+                // aggregate the first tuple
+                for (int i = 0; i < pipelines.length; i++) {
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                // it only works if the output of the aggregator fits in one
+                // frame
+                for (int i = 0; i < pipelines.length; i++) {
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].close();
+                }
+                // outputWriter.writeTuple(appender);
+                tupleBuilder.reset();
+                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                byte[] data = tb.getByteArray();
+                int[] fieldEnds = tb.getFieldEndOffsets();
+                int start = 0;
+                int offset = 0;
+                for (int i = 0; i < fieldEnds.length; i++) {
+                    if (i > 0)
+                        start = fieldEnds[i - 1];
+                    offset = fieldEnds[i] - start;
+                    tupleBuilder.addField(data, start, offset);
+                }
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+    }
+
+    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+
+    /**
+     * We suppose for now, that each subplan only produces one tuple.
+     */
+    private static class AggregatorOutput implements IFrameWriter {
+
+        // private ByteBuffer frame;
+        private FrameTupleAccessor[] tAccess;
+        private RecordDescriptor[] inputRecDesc;
+        private int inputIdx;
+        private ArrayTupleBuilder tb;
+        private AlgebricksPipeline[] subplans;
+
+        public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+            this.subplans = subplans;
+            // this.keyFieldIndexes = keyFieldIndexes;
+            int totalAggFields = 0;
+            this.inputRecDesc = new RecordDescriptor[subplans.length];
+            for (int i = 0; i < subplans.length; i++) {
+                RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+                this.inputRecDesc[i] = rd[rd.length - 1];
+                totalAggFields += subplans[i].getOutputWidth();
+            }
+            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+
+            this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+            for (int i = 0; i < inputRecDesc.length; i++) {
+                tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+            }
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        /**
+         * Since each pipeline only produces one tuple, this method is only
+         * called by the close method of the pipelines.
+         */
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            int tIndex = 0;
+            int w = subplans[inputIdx].getOutputWidth();
+            IFrameTupleAccessor accessor = tAccess[inputIdx];
+            accessor.reset(buffer);
+            for (int f = 0; f < w; f++) {
+                tb.addField(accessor, tIndex, f);
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // clearFrame();
+        }
+
+        public void setInputIdx(int inputIdx) {
+            this.inputIdx = inputIdx;
+        }
+
+        public ArrayTupleBuilder getTupleBuilder() {
+            return tb;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+        }
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..4d5d3d6
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,147 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private ICopySerializableAggregateFunctionFactory[] aggFactories;
+
+    public SerializableAggregatorDescriptorFactory(ICopySerializableAggregateFunctionFactory[] aggFactories) {
+        this.aggFactories = aggFactories;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        final int[] keys = keyFields;
+
+        /**
+         * one IAggregatorDescriptor instance per Gby operator
+         */
+        return new IAggregatorDescriptor() {
+            private FrameTupleReference ftr = new FrameTupleReference();
+            private ICopySerializableAggregateFunction[] aggs = new ICopySerializableAggregateFunction[aggFactories.length];
+            private int offsetFieldIndex = keys.length;
+            private int stateFieldLength[] = new int[aggFactories.length];
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                DataOutput output = tb.getDataOutput();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        int begin = tb.getSize();
+                        if (aggs[i] == null) {
+                            aggs[i] = aggFactories[i].createAggregateFunction();
+                        }
+                        aggs[i].init(output);
+                        tb.addFieldEndOffset();
+                        stateFieldLength[i] = tb.getSize() - begin;
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                // doing initial aggregate
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = tb.getByteArray();
+                        int prevFieldPos = i + keys.length - 1;
+                        int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ftr.reset(accessor, tIndex);
+                int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
+                int fieldSlotLength = stateAccessor.getFieldSlotsLength();
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = stateAccessor.getBuffer().array();
+                        int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+                                + stateTupleStart + fieldSlotLength;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                byte[] data = accessor.getBuffer().array();
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                int start = refOffset;
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
+                        start += stateFieldLength[i];
+                        tb.addFieldEndOffset();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                byte[] data = accessor.getBuffer().array();
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                int start = refOffset;
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
+                        start += stateFieldLength[i];
+                        tb.addFieldEndOffset();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void close() {
+                reset();
+            }
+
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..83925cc
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private IAggregateEvaluatorFactory[] aggFactories;
+
+    public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys,
+            int[] fdColumns) {
+        this.aggFactories = aggFactories;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+        return new IAggregatorDescriptor() {
+
+            private FrameTupleReference ftr = new FrameTupleReference();
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+
+                // initialize aggregate functions
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].init();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish(p);
+                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                IAggregateEvaluator[] agg = new IAggregateEvaluator[aggFactories.length];
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i] = aggFactories[i].createAggregateEvaluator(ctx);
+                    } catch (AlgebricksException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+                return new AggregateState(agg);
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+    }
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
new file mode 100644
index 0000000..cd9931b
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
+
+    protected FrameTupleAppender appender;
+    protected ByteBuffer frame;
+    protected FrameTupleAccessor tAccess;
+    protected FrameTupleReference tRef;
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!failed) {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+        }
+        writer.close();
+        appender.reset(frame, true);
+    }
+
+    protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+            }
+        }
+    }
+
+    protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+        if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+            }
+        }
+    }
+
+    protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
+        if (!appender.append(tAccess, tIndex)) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tAccess, tIndex)) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
+            }
+        }
+    }
+
+    protected final void initAccessAppend(IHyracksTaskContext ctx) {
+        // if (allocFrame) {
+        frame = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(frame, true);
+        // }
+        tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+    }
+
+    protected final void initAccessAppendRef(IHyracksTaskContext ctx) {
+        initAccessAppend(ctx);
+        tRef = new FrameTupleReference();
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
new file mode 100644
index 0000000..22f7df6
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputOneOutputPushRuntime extends AbstractOneInputPushRuntime {
+
+    protected RecordDescriptor inputRecordDesc;
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
new file mode 100644
index 0000000..f2c1008
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected int[] projectionList;
+
+    public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
+        this.projectionList = projectionList;
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        return createOneOutputPushRuntime(ctx);
+    }
+
+    public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
+            throws AlgebricksException;
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
new file mode 100644
index 0000000..e600264
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
+    protected IFrameWriter writer;
+    protected RecordDescriptor outputRecordDesc;
+    protected boolean failed;
+
+    @Override
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        this.writer = writer;
+        this.outputRecordDesc = recordDesc;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+        writer.fail();
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
new file mode 100644
index 0000000..0fbe51f
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
+    protected RecordDescriptor inputRecordDesc;
+
+    @Override
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
new file mode 100644
index 0000000..1d23d14
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInputPushRuntime {
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
new file mode 100644
index 0000000..f175367
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SinkRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public SinkRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "sink";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new AbstractOneInputSinkPushRuntime() {
+
+            @Override
+            public void open() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
new file mode 100644
index 0000000..388e6c9
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final RecordDescriptor inRecordDesc;
+    private final RecordDescriptor outRecordDesc;
+
+    public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, int[] projectionList) {
+        super(projectionList);
+        // Obs: the projection list is currently ignored.
+        if (projectionList != null) {
+            throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+        }
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.inRecordDesc = inRecordDesc;
+        this.outRecordDesc = outRecordDesc;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        try {
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                    outRecordDesc, groupFields, groupFields);
+            final ByteBuffer copyFrame = ctx.allocateFrame();
+            final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+            copyFrameAccessor.reset(copyFrame);
+            ByteBuffer outFrame = ctx.allocateFrame();
+            final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(outFrame, true);
+
+            return new AbstractOneInputOneOutputPushRuntime() {
+
+                private PreclusteredGroupWriter pgw;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+                            outRecordDesc, writer);
+                    pgw.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    pgw.nextFrame(buffer);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    pgw.fail();
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    pgw.close();
+                }
+            };
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
new file mode 100644
index 0000000..f2659f5
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.nio.ByteBuffer;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    // array of factories for building the local runtime pipeline
+    private final AlgebricksPipeline pipeline;
+
+    public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
+            IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
+        super(spec, inputArity, outputArity);
+        if (outputArity == 1) {
+            this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+        }
+        this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
+    }
+
+    public AlgebricksPipeline getPipeline() {
+        return pipeline;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject json = super.toJSON();
+        json.put("micro-operators", pipeline.getRuntimeFactories());
+        return json;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Asterix { \n");
+        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+            sb.append("  " + f.toString() + ";\n");
+        }
+        sb.append("}");
+        // sb.append(super.getInputArity());
+        // sb.append(";");
+        // sb.append(super.getOutputArity());
+        // sb.append(";");
+        return sb.toString();
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        if (inputArity == 0) {
+            return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        } else {
+            return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        }
+    }
+
+    private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+            public void initialize() throws HyracksDataException {
+                IFrameWriter startOfPipeline;
+                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+                        : null;
+
+                PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
+                        pipelineOutputRecordDescriptor);
+                try {
+                    startOfPipeline = pa.assemblePipeline(writer, ctx);
+                } catch (AlgebricksException e) {
+                    throw new HyracksDataException(e);
+                }
+                startOfPipeline.open();
+                startOfPipeline.close();
+            }
+        };
+    }
+
+    private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            private IFrameWriter startOfPipeline;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (startOfPipeline == null) {
+                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+                            : null;
+                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
+                            AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+                    PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
+                            pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
+                    try {
+                        startOfPipeline = pa.assemblePipeline(writer, ctx);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                startOfPipeline.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                startOfPipeline.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                startOfPipeline.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                startOfPipeline.fail();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
new file mode 100644
index 0000000..92410c2
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PipelineAssembler {
+
+    // array of factories for building the local runtime pipeline
+    private final RecordDescriptor pipelineInputRecordDescriptor;
+    private final RecordDescriptor pipelineOutputRecordDescriptor;
+
+    private final int inputArity;
+    private final int outputArity;
+    private final AlgebricksPipeline pipeline;
+
+    public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+            RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+        this.pipeline = pipeline;
+        this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
+        this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
+        this.inputArity = inputArity;
+        this.outputArity = outputArity;
+    }
+
+    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws AlgebricksException {
+        // plug the operators
+        IFrameWriter start = writer;// this.writer;
+        for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+            if (i == pipeline.getRuntimeFactories().length - 1) {
+                if (outputArity == 1) {
+                    newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+                }
+            } else {
+                newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+            }
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
+            } else if (inputArity > 0) {
+                newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
new file mode 100644
index 0000000..f1ee570
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AlgebricksPipeline pipeline;
+    private final RecordDescriptor inputRecordDesc;
+    private final INullWriterFactory[] nullWriterFactories;
+
+    public SubplanRuntimeFactory(AlgebricksPipeline pipeline, INullWriterFactory[] nullWriterFactories,
+            RecordDescriptor inputRecordDesc, int[] projectionList) {
+        super(projectionList);
+        this.pipeline = pipeline;
+        this.nullWriterFactories = nullWriterFactories;
+        this.inputRecordDesc = inputRecordDesc;
+        if (projectionList != null) {
+            throw new NotImplementedException();
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Subplan { \n");
+        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+            sb.append("  " + f.toString() + ";\n");
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+
+        RecordDescriptor pipelineOutputRecordDescriptor = null;
+
+        final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc,
+                pipelineOutputRecordDescriptor);
+        final INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+        for (int i = 0; i < nullWriterFactories.length; i++) {
+            nullWriters[i] = nullWriterFactories[i].createNullWriter();
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            /**
+             * Computes the outer product between a given tuple and the frames
+             * passed.
+             */
+            class TupleOuterProduct implements IFrameWriter {
+
+                private boolean smthWasWritten = false;
+                private IHyracksTaskContext hCtx = ctx;
+                private int frameSize = hCtx.getFrameSize();
+                private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+                        pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
+                private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
+
+                @Override
+                public void open() throws HyracksDataException {
+                    smthWasWritten = false;
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ta.reset(buffer);
+                    int nTuple = ta.getTupleCount();
+                    for (int t = 0; t < nTuple; t++) {
+                        if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+                            FrameUtils.flushFrame(frame, writer);
+                            appender.reset(frame, true);
+                            if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+                                throw new IllegalStateException("Could not write frame.");
+                            }
+                        }
+                    }
+                    smthWasWritten = true;
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (!smthWasWritten) {
+                        // the case when we need to write nulls
+                        appendNullsToTuple();
+                        appendToFrameFromTupleBuilder(tb);
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
+
+                private void appendNullsToTuple() throws HyracksDataException {
+                    tb.reset();
+                    int n0 = tRef.getFieldCount();
+                    for (int f = 0; f < n0; f++) {
+                        tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f);
+                    }
+                    DataOutput dos = tb.getDataOutput();
+                    for (int i = 0; i < nullWriters.length; i++) {
+                        nullWriters[i].writeNull(dos);
+                        tb.addFieldEndOffset();
+                    }
+                }
+
+            }
+
+            IFrameWriter endPipe = new TupleOuterProduct();
+
+            NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+
+            boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppendRef(ctx);
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    startOfPipeline.writeTuple(buffer, t);
+                    startOfPipeline.open();
+                    startOfPipeline.close();
+                }
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
new file mode 100644
index 0000000..fd2710d
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+
+public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] sortFields;
+    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private IBinaryComparatorFactory[] comparatorFactories;
+
+    public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+        super(projectionList);
+        // Obs: the projection list is currently ignored.
+        if (projectionList != null) {
+            throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+        }
+        this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputPushRuntime() {
+
+            FrameSorter frameSorter = null;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (frameSorter == null) {
+                    frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                            outputRecordDesc);
+                }
+                frameSorter.reset();
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                frameSorter.insertFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                frameSorter.sortFrames();
+                frameSorter.flushFrames(writer);
+                writer.close();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
new file mode 100644
index 0000000..e47384d
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] outColumns;
+    private IScalarEvaluatorFactory[] evalFactories;
+
+    /**
+     * @param outColumns
+     *            a sorted array of columns into which the result is written to
+     * @param evalFactories
+     * @param projectionList
+     *            an array of columns to be projected
+     */
+
+    public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.evalFactories = evalFactories;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("assign [");
+        for (int i = 0; i < outColumns.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(outColumns[i]);
+        }
+        sb.append("] := [");
+        for (int i = 0; i < evalFactories.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(evalFactories[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        final int[] projectionToOutColumns = new int[projectionList.length];
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable result = VoidPointable.FACTORY.createPointable();
+            private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    initAccessAppendRef(ctx);
+                    first = false;
+                    int n = evalFactories.length;
+                    for (int i = 0; i < n; i++) {
+                        try {
+                            eval[i] = evalFactories[i].createScalarEvaluator(ctx);
+                        } catch (AlgebricksException ae) {
+                            throw new HyracksDataException(ae);
+                        }
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    produceTuple(tupleBuilder, tAccess, t, tRef);
+                    appendToFrameFromTupleBuilder(tupleBuilder);
+                }
+            }
+
+            private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    FrameTupleReference tupleRef) throws HyracksDataException {
+                tb.reset();
+                for (int f = 0; f < projectionList.length; f++) {
+                    int k = projectionToOutColumns[f];
+                    if (k >= 0) {
+                        try {
+                            eval[k].evaluate(tupleRef, result);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+                    } else {
+                        tb.addField(accessor, tIndex, projectionList[f]);
+                    }
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..dbe1ab1
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public EmptyTupleSourceRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "ets";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) {
+        return new AbstractOneInputSourcePushRuntime() {
+
+            private ByteBuffer frame = ctx.allocateFrame();
+            private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+            private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+                appender.reset(frame, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+                FrameUtils.flushFrame(frame, writer);
+                writer.close();
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..42724f7
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public NestedTupleSourceRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "nts";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+        return new NestedTupleSourceRuntime(ctx);
+    }
+
+    public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+        public NestedTupleSourceRuntime(IHyracksTaskContext ctx) {
+            initAccessAppend(ctx);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            writer.open();
+        }
+
+        public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
+            tAccess.reset(inputBuffer);
+            appendTupleToFrame(tIndex);
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+    }
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
new file mode 100644
index 0000000..a54c96d
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -0,0 +1,160 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static int NO_DEFAULT_BRANCH = -1;
+    
+    private final ICopyEvaluatorFactory[] evalFactories;
+    private final IBinaryBooleanInspector boolInspector;
+    private final int defaultBranchIndex;
+    
+    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
+            IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
+        super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
+        for (int i = 0; i < evalFactories.length; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+        this.evalFactories = evalFactories;
+        this.boolInspector = boolInspector;
+        this.defaultBranchIndex = defaultBranchIndex;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+            private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
+            private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
+            private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
+            private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
+            private final FrameTupleReference frameTuple = new FrameTupleReference();
+            
+            private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+            private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
+            private final DataOutput tupleDos = tupleBuilder.getDataOutput();
+            
+            @Override
+            public void close() throws HyracksDataException {
+                // Flush (possibly not full) buffers that have data, and close writers.
+                for (int i = 0; i < outputArity; i++) {
+                    tupleAppender.reset(writeBuffers[i], false);
+                    if (tupleAppender.getTupleCount() > 0) {
+                        FrameUtils.flushFrame(writeBuffers[i], writers[i]);
+                    }
+                    writers[i].close();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.fail();
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                accessor.reset(buffer);
+                int tupleCount = accessor.getTupleCount();
+                for (int i = 0; i < tupleCount; i++) {
+                    frameTuple.reset(accessor, i);
+                    boolean found = false;
+                    for (int j = 0; j < evals.length; j++) {
+                        try {
+                        	evalBuf.reset();
+                            evals[j].evaluate(frameTuple);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
+                        if (found) {
+                        	copyAndAppendTuple(j);
+                        	break;
+                        }
+                    }
+                    // Optionally write to default output branch.
+                    if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
+                    	copyAndAppendTuple(defaultBranchIndex);
+                    }
+                }
+            }
+
+            private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
+            	// Copy tuple into tuple builder.
+                try {
+                	tupleBuilder.reset();
+                    for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                        tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                                frameTuple.getFieldLength(i));
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+                // Append to frame.
+                tupleAppender.reset(writeBuffers[outputIndex], false);
+                if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
+                    tupleAppender.reset(writeBuffers[outputIndex], true);
+                    if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+            
+            @Override
+            public void open() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+                // Create write buffers.
+                for (int i = 0; i < outputArity; i++) {
+                    writeBuffers[i] = ctx.allocateFrame();
+                    // Make sure to clear all buffers, since we are reusing the tupleAppender.
+                    tupleAppender.reset(writeBuffers[i], true);
+                }
+                // Create evaluators for partitioning.
+				try {
+					for (int i = 0; i < evalFactories.length; i++) {
+						evals[i] = evalFactories[i].createEvaluator(evalBuf);
+					}
+				} catch (AlgebricksException e) {
+					throw new HyracksDataException(e);
+				}
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
+
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
new file mode 100644
index 0000000..5b53fc2
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] printColumns;
+    private final IPrinterFactory[] printerFactories;
+    private final RecordDescriptor inputRecordDesc;
+
+    public PrinterRuntimeFactory(int[] printColumns, IPrinterFactory[] printerFactories,
+            RecordDescriptor inputRecordDesc) {
+        this.printColumns = printColumns;
+        this.printerFactories = printerFactories;
+        this.inputRecordDesc = inputRecordDesc;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("print [");
+        for (int i = 0; i < printColumns.length; i++) {
+            if (i > 0) {
+                buf.append("; ");
+            }
+            buf.append(printColumns[i]);
+        }
+        buf.append("]");
+        return buf.toString();
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+        IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
+                inputRecordDesc);
+        return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
new file mode 100644
index 0000000..c15e050
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] outColumns;
+    private IRunningAggregateEvaluatorFactory[] runningAggregates;
+
+    /**
+     * @param outColumns
+     *            a sorted array of columns into which the result is written to
+     * @param runningAggregates
+     * @param projectionList
+     *            an array of columns to be projected
+     */
+
+    public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] runningAggregates,
+            int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.runningAggregates = runningAggregates;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("running-aggregate [");
+        for (int i = 0; i < outColumns.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(outColumns[i]);
+        }
+        sb.append("] := [");
+        for (int i = 0; i < runningAggregates.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(runningAggregates[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        final int[] projectionToOutColumns = new int[projectionList.length];
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                initAccessAppendRef(ctx);
+                if (first) {
+                    first = false;
+                    int n = runningAggregates.length;
+                    for (int i = 0; i < n; i++) {
+                        try {
+                            raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
+                            raggs[i].init();
+                        } catch (AlgebricksException ae) {
+                            throw new HyracksDataException(ae);
+                        }
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    produceTuple(tupleBuilder, tAccess, t, tRef);
+                    appendToFrameFromTupleBuilder(tupleBuilder);
+                }
+            }
+
+            private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    FrameTupleReference tupleRef) throws HyracksDataException {
+                tb.reset();
+                for (int f = 0; f < projectionList.length; f++) {
+                    int k = projectionToOutColumns[f];
+                    if (k >= 0) {
+                        try {
+                            raggs[k].step(tupleRef, p);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    } else {
+                        tb.addField(accessor, tIndex, projectionList[f]);
+                    }
+                }
+            }
+
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
new file mode 100644
index 0000000..148f087
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
+
+    private final IHyracksTaskContext ctx;
+    private final PrintStream printStream;
+    private final IAWriter writer;
+    private RecordDescriptor inputRecordDesc;
+    private FrameTupleAccessor tAccess;
+    private boolean autoClose = false;
+    private boolean first = true;
+
+    public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
+            RecordDescriptor inputRecordDesc) {
+        this.writer = writer;
+        this.ctx = ctx;
+        this.printStream = printStream;
+        this.inputRecordDesc = inputRecordDesc;
+        this.tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+    }
+
+    public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
+            RecordDescriptor inputRecordDesc, boolean autoClose) {
+        this(writer, ctx, printStream, inputRecordDesc);
+        this.autoClose = autoClose;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (first) {
+            first = false;
+            tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+            try {
+                writer.init();
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            try {
+                writer.printTuple(tAccess, t);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (autoClose) {
+            printStream.close();
+        }
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
new file mode 100644
index 0000000..1aff1aa
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] fields;
+    private final IPrinterFactory[] printerFactories;
+    private final File outputFile;
+    private final RecordDescriptor inputRecordDesc;
+    private final IAWriterFactory writerFactory;
+
+    public SinkWriterRuntimeFactory(int[] fields, IPrinterFactory[] printerFactories, File outputFile,
+            IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc) {
+        this.fields = fields;
+        this.printerFactories = printerFactories;
+        this.outputFile = outputFile;
+        this.writerFactory = writerFactory;
+        this.inputRecordDesc = inputRecordDesc;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("sink-write " + "[");
+        for (int i = 0; i < fields.length; i++) {
+            if (i > 0) {
+                buf.append("; ");
+            }
+            buf.append(fields[i]);
+        }
+        buf.append("] outputFile");
+        return buf.toString();
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        PrintStream filePrintStream = null;
+        try {
+            filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+        } catch (FileNotFoundException e) {
+            throw new AlgebricksException(e);
+        }
+        IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
+        return new SinkWriterRuntime(w, ctx, filePrintStream, inputRecordDesc, true);
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
new file mode 100644
index 0000000..796ef0a
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -0,0 +1,98 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IScalarEvaluatorFactory aftterObjectsEvalFactory;
+    private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+    public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
+            IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
+        super(projectionList);
+        this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
+        this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+    }
+
+    @Override
+    public String toString() {
+        String s = "stream-die " + aftterObjectsEvalFactory.toString();
+        return s;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+        final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private IScalarEvaluator evalAfterObjects;
+            private int toWrite = -1;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (evalAfterObjects == null) {
+                    initAccessAppendRef(ctx);
+                    try {
+                        evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                if (toWrite < 0) {
+                    toWrite = evaluateInteger(evalAfterObjects, 0);
+                }
+                for (int t = 0; t < nTuple; t++) {
+                    if (toWrite > 0) {
+                        toWrite--;
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    } else {
+                        throw new HyracksDataException("injected failure");
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                super.close();
+            }
+
+            private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
+                tRef.reset(tAccess, tIdx);
+                try {
+                    eval.evaluate(tRef, p);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+                return lim;
+            }
+
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
new file mode 100644
index 0000000..4e9d51c
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IScalarEvaluatorFactory maxObjectsEvalFactory;
+    private IScalarEvaluatorFactory offsetEvalFactory;
+    private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+    public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
+            IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
+            IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
+        super(projectionList);
+        this.maxObjectsEvalFactory = maxObjectsEvalFactory;
+        this.offsetEvalFactory = offsetEvalFactory;
+        this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+    }
+
+    @Override
+    public String toString() {
+        String s = "stream-limit " + maxObjectsEvalFactory.toString();
+        if (offsetEvalFactory != null) {
+            return s + ", " + offsetEvalFactory.toString();
+        } else {
+            return s;
+        }
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+        final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private IScalarEvaluator evalMaxObjects;
+            private IScalarEvaluator evalOffset = null;
+            private int toWrite = 0; // how many tuples still to write
+            private int toSkip = 0; // how many tuples still to skip
+            private boolean firstTuple = true;
+            private boolean afterLastTuple = false;
+
+            @Override
+            public void open() throws HyracksDataException {
+                // if (first) {
+                if (evalMaxObjects == null) {
+                    initAccessAppendRef(ctx);
+                    try {
+                        evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
+                        if (offsetEvalFactory != null) {
+                            evalOffset = offsetEvalFactory.createScalarEvaluator(ctx);
+                        }
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+                afterLastTuple = false;
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                if (afterLastTuple) {
+                    // ignore the data
+                    return;
+                }
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                int start = 0;
+                if (nTuple <= toSkip) {
+                    toSkip -= nTuple;
+                    return;
+                } else if (toSkip > 0) {
+                    start = toSkip;
+                    toSkip = 0;
+                }
+                for (int t = start; t < nTuple; t++) {
+                    if (firstTuple) {
+                        firstTuple = false;
+                        toWrite = evaluateInteger(evalMaxObjects, t);
+                        if (evalOffset != null) {
+                            toSkip = evaluateInteger(evalOffset, t);
+                        }
+                    }
+                    if (toSkip > 0) {
+                        toSkip--;
+                    } else if (toWrite > 0) {
+                        toWrite--;
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    } else {
+                        // close();
+                        afterLastTuple = true;
+                        break;
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                // if (!afterLastTuple) {
+                super.close();
+                // }
+            }
+
+            private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
+                tRef.reset(tAccess, tIdx);
+                try {
+                    eval.evaluate(tRef, p);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+                return lim;
+            }
+
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
new file mode 100644
index 0000000..455ad8e
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public StreamProjectRuntimeFactory(int[] projectionList) {
+        super(projectionList);
+    }
+
+    @Override
+    public String toString() {
+        return "stream-project " + Arrays.toString(projectionList);
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppend(ctx);
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    appendProjectionToFrame(t, projectionList);
+                }
+
+            }
+
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
new file mode 100644
index 0000000..b0c94b7
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IScalarEvaluatorFactory cond;
+
+    private IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+    /**
+     * @param cond
+     * @param projectionList
+     *            if projectionList is null, then no projection is performed
+     */
+    public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList,
+            IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+        super(projectionList);
+        this.cond = cond;
+        this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+    }
+
+    @Override
+    public String toString() {
+        return "stream-select " + cond.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+        final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private IScalarEvaluator eval;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (eval == null) {
+                    initAccessAppendRef(ctx);
+                    try {
+                        eval = cond.createScalarEvaluator(ctx);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    try {
+                        eval.evaluate(tRef, p);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                    if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    }
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
new file mode 100644
index 0000000..3f2765f
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private String command;
+    private IPrinterFactory[] printerFactories;
+    private char fieldDelimiter;
+    private ITupleParserFactory parserFactory;
+
+    public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
+            ITupleParserFactory parserFactory) {
+        super(null);
+        this.command = command;
+        this.printerFactories = printerFactories;
+        this.fieldDelimiter = fieldDelimiter;
+        this.parserFactory = parserFactory;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+        final IPrinter[] printers = new IPrinter[printerFactories.length];
+        for (int i = 0; i < printerFactories.length; i++) {
+            printers[i] = printerFactories[i].createPrinter();
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            final class ForwardScriptOutput implements Runnable {
+
+                private InputStream inStream;
+                private ITupleParser parser;
+
+                public ForwardScriptOutput(ITupleParser parser, InputStream inStream) {
+                    this.parser = parser;
+                    this.inStream = inStream;
+                }
+
+                @Override
+                public void run() {
+                    try {
+                        parser.parse(inStream, writer);
+                    } catch (HyracksDataException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        try {
+                            inStream.close();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            }
+
+            final class DumpInStreamToPrintStream implements Runnable {
+
+                private BufferedReader reader;
+                private PrintStream printStream;
+
+                public DumpInStreamToPrintStream(InputStream inStream, PrintStream printStream) {
+                    this.reader = new BufferedReader(new InputStreamReader(inStream));
+                    this.printStream = printStream;
+                }
+
+                @Override
+                public void run() {
+                    String s;
+                    try {
+                        while ((s = reader.readLine()) != null) {
+                            printStream.println(s);
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        try {
+                            reader.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        printStream.close();
+                    }
+                }
+
+            }
+
+            private Process process;
+            private PrintStream ps;
+            private boolean first = true;
+            private Thread outputPipe;
+            private Thread dumpStderr;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppendRef(ctx);
+                }
+
+                try {
+                    ITupleParser parser = parserFactory.createTupleParser(ctx);
+                    process = Runtime.getRuntime().exec(command);
+                    ps = new PrintStream(process.getOutputStream());
+                    ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
+                    outputPipe = new Thread(fso);
+                    outputPipe.start();
+                    DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
+                            System.err);
+                    dumpStderr = new Thread(disps);
+                    dumpStderr.start();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    for (int i = 0; i < printers.length; i++) {
+                        try {
+                            printers[i].print(buffer.array(), tRef.getFieldStart(i), tRef.getFieldLength(i), ps);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        ps.print(fieldDelimiter);
+                        if (i == printers.length - 1) {
+                            ps.print('\n');
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                // first close the printer printing to the process
+                ps.close();
+                int ret = 0;
+                // then wait for the process to finish
+
+                try {
+                    ret = process.waitFor();
+                    outputPipe.join();
+                    dumpStderr.join();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+                if (ret != 0) {
+                    throw new HyracksDataException("Process exit value: " + ret);
+                }
+                // close the following operator in the chain
+                super.close();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
new file mode 100644
index 0000000..d42a294
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int outCol;
+    private final IUnnestingEvaluatorFactory unnestingFactory;
+    private int outColPos;
+    private final boolean outColIsProjected;
+
+    // Each time step() is called on the aggregate, a new value is written in
+    // its output. One byte is written before that value and is neglected.
+    // By convention, if the aggregate function writes nothing, it means it
+    // produced the last value.
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+        super(projectionList);
+        this.outCol = outCol;
+        this.unnestingFactory = unnestingFactory;
+        outColPos = -1;
+        for (int f = 0; f < projectionList.length; f++) {
+            if (projectionList[f] == outCol) {
+                outColPos = f;
+            }
+        }
+        outColIsProjected = outColPos >= 0;
+    }
+
+    @Override
+    public String toString() {
+        return "unnest " + outCol + " <- " + unnestingFactory;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private IUnnestingEvaluator agg;
+            private ArrayTupleBuilder tupleBuilder;
+
+            @Override
+            public void open() throws HyracksDataException {
+                initAccessAppendRef(ctx);
+                try {
+                    agg = unnestingFactory.createUnnestingEvaluator(ctx);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    try {
+                        agg.init(tRef);
+                        boolean goon = true;
+                        do {
+                            tupleBuilder.reset();
+                            if (!agg.step(p)) {
+                                goon = false;
+                            } else {
+                                if (!outColIsProjected) {
+                                    appendProjectionToFrame(t, projectionList);
+                                } else {
+                                    for (int f = 0; f < outColPos; f++) {
+                                        tupleBuilder.addField(tAccess, t, f);
+                                    }
+                                    tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                        tupleBuilder.addField(tAccess, t, f);
+                                    }
+                                }
+                                appendToFrameFromTupleBuilder(tupleBuilder);
+                            }
+                        } while (goon);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+            }
+        };
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
new file mode 100644
index 0000000..9c53241
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterBasedWriterFactory implements IAWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final PrinterBasedWriterFactory INSTANCE = new PrinterBasedWriterFactory();
+
+    public PrinterBasedWriterFactory() {
+    }
+
+    @Override
+    public IAWriter createWriter(final int[] fields, final PrintStream printStream,
+            final IPrinterFactory[] printerFactories, RecordDescriptor inputRecordDescriptor) {
+        final IPrinter[] printers = new IPrinter[printerFactories.length];
+        for (int i = 0; i < printerFactories.length; i++) {
+            printers[i] = printerFactories[i].createPrinter();
+        }
+
+        return new IAWriter() {
+
+            @Override
+            public void init() throws AlgebricksException {
+                for (int i = 0; i < printers.length; i++) {
+                    printers[i].init();
+                }
+            }
+
+            @Override
+            public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+                for (int i = 0; i < fields.length; i++) {
+                    int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+                            + tAccess.getFieldStartOffset(tIdx, fields[i]);
+                    int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+                    if (i > 0) {
+                        printStream.print("; ");
+                    }
+                    printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
+                }
+                printStream.println();
+            }
+        };
+    }
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
new file mode 100644
index 0000000..61f822e
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SerializedDataWriterFactory implements IAWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
+            final RecordDescriptor inputRecordDescriptor) {
+        return new IAWriter() {
+
+            @Override
+            public void init() throws AlgebricksException {
+                // dump the SerializerDeserializers to disk
+                try {
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    ObjectOutputStream oos = new ObjectOutputStream(baos);
+                    oos.writeObject(inputRecordDescriptor);
+                    baos.writeTo(ps);
+                    oos.close();
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+                for (int i = 0; i < fields.length; i++) {
+                    int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+                            + tAccess.getFieldStartOffset(tIdx, fields[i]);
+                    int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+                    ps.write(tAccess.getBuffer().array(), fldStart, fldLen);
+                }
+            }
+        };
+    }
+}