merged hyracks_asterix_stabilization r1440:1453

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1488 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml b/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml
new file mode 100644
index 0000000..79bcaeb
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml
@@ -0,0 +1,51 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-algebricks-runtime</artifactId>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-algebricks</artifactId>
+    <version>0.2.1-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-storage-am-btree</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-storage-am-rtree</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-dataflow-std</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-algebricks-common</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-algebricks-data</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
new file mode 100644
index 0000000..973fef6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountAggregateFunctionFactory implements IAggregateFunctionFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException {
+
+        final DataOutput out = provider.getDataOutput();
+        return new IAggregateFunction() {
+
+            int cnt;
+
+            @Override
+            public void step(IFrameTupleReference tuple) throws AlgebricksException {
+                ++cnt;
+            }
+
+            @Override
+            public void init() throws AlgebricksException {
+                cnt = 0;
+            }
+
+            @Override
+            public void finish() throws AlgebricksException {
+                try {
+                    out.writeInt(cnt);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void finishPartial() throws AlgebricksException {
+                try {
+                    out.writeInt(cnt);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..736baaa
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountRunningAggregateFunctionFactory implements IRunningAggregateFunctionFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+            throws AlgebricksException {
+
+        final DataOutput out = provider.getDataOutput();
+
+        return new IRunningAggregateFunction() {
+
+            int cnt;
+
+            @Override
+            public void step(IFrameTupleReference tuple) throws AlgebricksException {
+                ++cnt;
+                try {
+                    out.writeInt(cnt);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void init() throws AlgebricksException {
+                cnt = 0;
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
new file mode 100644
index 0000000..50f014b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class AlgebricksPipeline implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final IPushRuntimeFactory[] runtimeFactories;
+    private final RecordDescriptor[] recordDescriptors;
+
+    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) {
+        this.runtimeFactories = runtimeFactories;
+        this.recordDescriptors = recordDescriptors;
+        // this.projectedColumns = projectedColumns;
+    }
+
+    public IPushRuntimeFactory[] getRuntimeFactories() {
+        return runtimeFactories;
+    }
+
+    public RecordDescriptor[] getRecordDescriptors() {
+        return recordDescriptors;
+    }
+
+    public int getOutputWidth() {
+        return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
+    }
+
+    // public int[] getProjectedColumns() {
+    // return projectedColumns;
+    // }
+
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java
new file mode 100644
index 0000000..8701c24
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateFunction {
+    /** should be called each time a new aggregate value is computed */
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public void finish() throws AlgebricksException;
+
+    public void finishPartial() throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java
new file mode 100644
index 0000000..3c3bfeb
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IAggregateFunctionFactory extends Serializable {
+    public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java
new file mode 100644
index 0000000..11767e3
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IEvaluator {
+    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java
new file mode 100644
index 0000000..dde64c0
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IEvaluatorFactory extends Serializable {
+    public IEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
new file mode 100644
index 0000000..26fcc67
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IPushRuntime extends IFrameWriter {
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
new file mode 100644
index 0000000..3e349e1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+
+public interface IPushRuntimeFactory extends Serializable {
+    public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java
new file mode 100644
index 0000000..dc67f29
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IRunningAggregateFunction {
+    public void init() throws AlgebricksException;
+
+    public void step(IFrameTupleReference tuple) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..632b6f3
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IRunningAggregateFunctionFactory extends Serializable {
+    public IRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+            throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java
new file mode 100644
index 0000000..3167bce
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ISerializableAggregateFunction {
+    /**
+     * initialize the space occupied by internal state
+     * 
+     * @param state
+     * @throws AlgebricksException
+     * @return length of the intermediate state
+     */
+    public void init(DataOutput state) throws AlgebricksException;
+
+    /**
+     * update the internal state
+     * 
+     * @param tuple
+     * @param state
+     * @throws AlgebricksException
+     */
+    public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException;
+
+    /**
+     * output the state to result
+     * 
+     * @param state
+     * @param result
+     * @throws AlgebricksException
+     */
+    public void finish(byte[] data, int start, int len, DataOutput result) throws AlgebricksException;
+
+    /**
+     * output the partial state to partial result
+     * 
+     * @param state
+     * @param partialResult
+     * @throws AlgebricksException
+     */
+    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java
new file mode 100644
index 0000000..262aa5a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ISerializableAggregateFunctionFactory extends Serializable {
+    public ISerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java
new file mode 100644
index 0000000..d0e4e06
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IUnnestingFunction {
+    public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public boolean step() throws AlgebricksException;
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java
new file mode 100644
index 0000000..196708d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IUnnestingFunctionFactory extends Serializable {
+    public IUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
new file mode 100644
index 0000000..8ab92ee
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class AsterixBTreeRegistry {
+
+    private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+    private Lock registryLock = new ReentrantLock();
+
+    public BTree get(int fileId) {
+        return map.get(fileId);
+    }
+
+    // TODO: not very high concurrency, but good enough for now
+    public void lock() {
+        registryLock.lock();
+    }
+
+    public void unlock() {
+        registryLock.unlock();
+    }
+
+    public void register(int fileId, BTree btree) {
+        map.put(fileId, btree);
+    }
+
+    public void unregister(int fileId) {
+        map.remove(fileId);
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
new file mode 100644
index 0000000..8a1cf0a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class RuntimeContext {
+    private IHyracksTaskContext hyracksContext;
+
+    public RuntimeContext() {
+    }
+
+    public IHyracksTaskContext getHyracksContext() {
+        return hyracksContext;
+    }
+
+    public void setHyracksContext(IHyracksTaskContext hyracksContext) {
+        this.hyracksContext = hyracksContext;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
new file mode 100644
index 0000000..d019845
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ColumnAccessEvalFactory implements IEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int fieldIndex;
+
+    public ColumnAccessEvalFactory(int fieldIndex) {
+        this.fieldIndex = fieldIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "ColumnAccess(" + fieldIndex + ")";
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                byte[] buffer = tuple.getFieldData(fieldIndex);
+                int start = tuple.getFieldStart(fieldIndex);
+                int length = tuple.getFieldLength(fieldIndex);
+                try {
+                    out.write(buffer, start, length);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
new file mode 100644
index 0000000..39f56b8
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvalFactory implements IEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] value;
+
+    public ConstantEvalFactory(byte[] value) {
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "Constant";
+    }
+
+    @Override
+    public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+        return new IEvaluator() {
+
+            private DataOutput out = output.getDataOutput();
+
+            @Override
+            public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                try {
+                    out.write(value, 0, value.length);
+                } catch (IOException ioe) {
+                    throw new AlgebricksException(ioe);
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
new file mode 100644
index 0000000..f6a4f9d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    // private int[] outColumns;
+    private IAggregateFunctionFactory[] aggregFactories;
+
+    public AggregateRuntimeFactory(IAggregateFunctionFactory[] aggregFactories) {
+        super(null);
+        // this.outColumns = outColumns;
+        this.aggregFactories = aggregFactories;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("assign [");
+        for (int i = 0; i < aggregFactories.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(aggregFactories[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private IAggregateFunction[] aggregs = new IAggregateFunction[aggregFactories.length];
+            private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
+
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                try {
+                    if (first) {
+                        first = false;
+                        initAccessAppendRef(context);
+                        for (int i = 0; i < aggregFactories.length; i++) {
+                            aggregs[i] = aggregFactories[i].createAggregateFunction(evalOutput);
+                        }
+                    }
+                    for (int i = 0; i < aggregFactories.length; i++) {
+                        aggregs[i].init();
+                    }
+                } catch (AlgebricksException e) {
+                    throw new HyracksDataException(e);
+                }
+
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    processTuple(tRef);
+                }
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                computeAggregate();
+                appendToFrameFromTupleBuilder(tupleBuilder);
+                super.close();
+            }
+
+            private void computeAggregate() throws HyracksDataException {
+                tupleBuilder.reset();
+                for (int f = 0; f < aggregs.length; f++) {
+                    evalOutput.reset();
+                    try {
+                        aggregs[f].finish();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    tupleBuilder.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+                }
+            }
+
+            private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
+                for (int f = 0; f < aggregs.length; f++) {
+                    try {
+                        aggregs[f].step(tupleRef);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..dfd9ec7
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AlgebricksPipeline[] subplans;
+    private int[] keyFieldIdx;
+    private int[] decorFieldIdx;
+
+    public NestedPlansAccumulatingAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx,
+            int[] decorFieldIdx) {
+        this.subplans = subplans;
+        this.keyFieldIdx = keyFieldIdx;
+        this.decorFieldIdx = decorFieldIdx;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+
+        final RuntimeContext rc = new RuntimeContext();
+        rc.setHyracksContext(ctx);
+        final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+                decorFieldIdx.length);
+        final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            try {
+                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, rc);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                tb.reset();
+                for (int i = 0; i < keyFieldIdx.length; ++i) {
+                    tb.addField(accessor, tIndex, keyFieldIdx[i]);
+                }
+                for (int i = 0; i < decorFieldIdx.length; ++i) {
+                    tb.addField(accessor, tIndex, decorFieldIdx[i]);
+                }
+                for (int i = 0; i < pipelines.length; ++i) {
+                    pipelines[i].open();
+                }
+
+                // aggregate the first tuple
+                for (int i = 0; i < pipelines.length; i++) {
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                // it only works if the output of the aggregator fits in one
+                // frame
+                for (int i = 0; i < pipelines.length; i++) {
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].close();
+                }
+                // outputWriter.writeTuple(appender);
+                tupleBuilder.reset();
+                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                byte[] data = tb.getByteArray();
+                int[] fieldEnds = tb.getFieldEndOffsets();
+                int start = 0;
+                int offset = 0;
+                for (int i = 0; i < fieldEnds.length; i++) {
+                    if (i > 0)
+                        start = fieldEnds[i - 1];
+                    offset = fieldEnds[i] - start;
+                    tupleBuilder.addField(data, start, offset);
+                }
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+    }
+
+    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, RuntimeContext rc)
+            throws AlgebricksException {
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(rc);
+            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+
+    /**
+     * We suppose for now, that each subplan only produces one tuple.
+     */
+    private static class AggregatorOutput implements IFrameWriter {
+
+        // private ByteBuffer frame;
+        private FrameTupleAccessor[] tAccess;
+        private RecordDescriptor[] inputRecDesc;
+        private int inputIdx;
+        private ArrayTupleBuilder tb;
+        private AlgebricksPipeline[] subplans;
+
+        public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+            this.subplans = subplans;
+            // this.keyFieldIndexes = keyFieldIndexes;
+            int totalAggFields = 0;
+            this.inputRecDesc = new RecordDescriptor[subplans.length];
+            for (int i = 0; i < subplans.length; i++) {
+                RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+                this.inputRecDesc[i] = rd[rd.length - 1];
+                totalAggFields += subplans[i].getOutputWidth();
+            }
+            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+
+            this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+            for (int i = 0; i < inputRecDesc.length; i++) {
+                tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+            }
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        /**
+         * Since each pipeline only produces one tuple, this method is only
+         * called by the close method of the pipelines.
+         */
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            int tIndex = 0;
+            int w = subplans[inputIdx].getOutputWidth();
+            IFrameTupleAccessor accessor = tAccess[inputIdx];
+            accessor.reset(buffer);
+            for (int f = 0; f < w; f++) {
+                tb.addField(accessor, tIndex, f);
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // clearFrame();
+        }
+
+        public void setInputIdx(int inputIdx) {
+            this.inputIdx = inputIdx;
+        }
+
+        public ArrayTupleBuilder getTupleBuilder() {
+            return tb;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+        }
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..ec9a63c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,147 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ISerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ISerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+    private static final long serialVersionUID = 1L;
+    private ISerializableAggregateFunctionFactory[] aggFactories;
+
+    public SerializableAggregatorDescriptorFactory(ISerializableAggregateFunctionFactory[] aggFactories) {
+        this.aggFactories = aggFactories;
+    }
+
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
+        final int[] keys = keyFields;
+
+        /**
+         * one IAggregatorDescriptor instance per Gby operator
+         */
+        return new IAggregatorDescriptor() {
+            private FrameTupleReference ftr = new FrameTupleReference();
+            private ISerializableAggregateFunction[] aggs = new ISerializableAggregateFunction[aggFactories.length];
+            private int offsetFieldIndex = keys.length;
+            private int stateFieldLength[] = new int[aggFactories.length];
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+                    throws HyracksDataException {
+                DataOutput output = tb.getDataOutput();
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        int begin = tb.getSize();
+                        if (aggs[i] == null) {
+                            aggs[i] = aggFactories[i].createAggregateFunction();
+                        }
+                        aggs[i].init(output);
+                        tb.addFieldEndOffset();
+                        stateFieldLength[i] = tb.getSize() - begin;
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                // doing initial aggregate
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = tb.getByteArray();
+                        int prevFieldPos = i + keys.length - 1;
+                        int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                ftr.reset(accessor, tIndex);
+                int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
+                int fieldSlotLength = stateAccessor.getFieldSlotsLength();
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        byte[] data = stateAccessor.getBuffer().array();
+                        int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+                                + stateTupleStart + fieldSlotLength;
+                        aggs[i].step(ftr, data, start, stateFieldLength[i]);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                byte[] data = accessor.getBuffer().array();
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                int start = refOffset;
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
+                        start += stateFieldLength[i];
+                        tb.addFieldEndOffset();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                byte[] data = accessor.getBuffer().array();
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+                int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+                int start = refOffset;
+                for (int i = 0; i < aggs.length; i++) {
+                    try {
+                        aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
+                        start += stateFieldLength[i];
+                        tb.addFieldEndOffset();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void close() {
+                reset();
+            }
+
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..99feae1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private IAggregateFunctionFactory[] aggFactories;
+
+    public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories, int[] keys,
+            int[] fdColumns) {
+        this.aggFactories = aggFactories;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+        return new IAggregatorDescriptor() {
+
+            private FrameTupleReference ftr = new FrameTupleReference();
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.first;
+                IAggregateFunction[] agg = aggState.second;
+
+                // initialize aggregate functions
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i].reset();
+                    try {
+                        agg[i].init();
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                IAggregateFunction[] agg = aggState.second;
+                ftr.reset(accessor, tIndex);
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].step(ftr);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+                ArrayBackedValueStorage[] aggOutput = aggState.first;
+                IAggregateFunction[] agg = aggState.second;
+                for (int i = 0; i < agg.length; i++) {
+                    try {
+                        agg[i].finish();
+                        tupleBuilder.addField(aggOutput[i].getBytes(), aggOutput[i].getStartIndex(),
+                                aggOutput[i].getLength());
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+                ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+                for (int i = 0; i < agg.length; i++) {
+                    aggOutput[i] = new ArrayBackedValueStorage();
+                    try {
+                        agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+                    } catch (AlgebricksException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+                return new AggregateState(new Pair<ArrayBackedValueStorage[], IAggregateFunction[]>(aggOutput, agg));
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
new file mode 100644
index 0000000..0ff1b89
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
+
+    protected FrameTupleAppender appender;
+    protected ByteBuffer frame;
+    protected FrameTupleAccessor tAccess;
+    protected FrameTupleReference tRef;
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(frame, writer);
+        }
+        writer.close();
+        appender.reset(frame, true);
+    }
+
+    protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+            }
+        }
+    }
+
+    protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+        if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+            }
+        }
+    }
+
+    protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
+        if (!appender.append(tAccess, tIndex)) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tAccess, tIndex)) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
+            }
+        }
+    }
+
+    protected final void initAccessAppend(RuntimeContext context) {
+        IHyracksTaskContext hCtx = context.getHyracksContext();
+        // if (allocFrame) {
+        frame = hCtx.allocateFrame();
+        appender = new FrameTupleAppender(hCtx.getFrameSize());
+        appender.reset(frame, true);
+        // }
+        tAccess = new FrameTupleAccessor(hCtx.getFrameSize(), inputRecordDesc);
+    }
+
+    protected final void initAccessAppendRef(RuntimeContext context) {
+        initAccessAppend(context);
+        tRef = new FrameTupleReference();
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
new file mode 100644
index 0000000..22f7df6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputOneOutputPushRuntime extends AbstractOneInputPushRuntime {
+
+    protected RecordDescriptor inputRecordDesc;
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
new file mode 100644
index 0000000..81db95e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+
+public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected int[] projectionList;
+
+    public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
+        this.projectionList = projectionList;
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+        return createOneOutputPushRuntime(context);
+    }
+
+    public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+            throws AlgebricksException;
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
new file mode 100644
index 0000000..b214f83
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
+    protected IFrameWriter writer;
+    protected RecordDescriptor outputRecordDesc;
+
+    @Override
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        this.writer = writer;
+        this.outputRecordDesc = recordDesc;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
new file mode 100644
index 0000000..0fbe51f
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
+    protected RecordDescriptor inputRecordDesc;
+
+    @Override
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
new file mode 100644
index 0000000..1d23d14
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInputPushRuntime {
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
new file mode 100644
index 0000000..81e91f6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SinkRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public SinkRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "sink";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+        return new AbstractOneInputSinkPushRuntime() {
+
+            @Override
+            public void open() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
new file mode 100644
index 0000000..ea83fdd
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final RecordDescriptor inRecordDesc;
+    private final RecordDescriptor outRecordDesc;
+
+    public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+            RecordDescriptor outRecordDesc, int[] projectionList) {
+        super(projectionList);
+        // Obs: the projection list is currently ignored.
+        if (projectionList != null) {
+            throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+        }
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.inRecordDesc = inRecordDesc;
+        this.outRecordDesc = outRecordDesc;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+            throws AlgebricksException {
+        try {
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            final IHyracksTaskContext ctx = context.getHyracksContext();
+            final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                    outRecordDesc, groupFields, groupFields);
+            final ByteBuffer copyFrame = ctx.allocateFrame();
+            final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+            copyFrameAccessor.reset(copyFrame);
+            ByteBuffer outFrame = ctx.allocateFrame();
+            final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+            appender.reset(outFrame, true);
+
+            return new AbstractOneInputOneOutputPushRuntime() {
+
+                private PreclusteredGroupWriter pgw;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+                            outRecordDesc, writer);
+                    pgw.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    pgw.nextFrame(buffer);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    pgw.fail();
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    pgw.close();
+                }
+            };
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
new file mode 100644
index 0000000..91fa8a1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.nio.ByteBuffer;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    // array of factories for building the local runtime pipeline
+    private final AlgebricksPipeline pipeline;
+
+    public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
+            IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
+        super(spec, inputArity, outputArity);
+        if (outputArity == 1) {
+            this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+        }
+        this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
+    }
+
+    public AlgebricksPipeline getPipeline() {
+        return pipeline;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject json = super.toJSON();
+        json.put("micro-operators", pipeline.getRuntimeFactories());
+        return json;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Asterix { \n");
+        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+            sb.append("  " + f.toString() + ";\n");
+        }
+        sb.append("}");
+        // sb.append(super.getInputArity());
+        // sb.append(";");
+        // sb.append(super.getOutputArity());
+        // sb.append(";");
+        return sb.toString();
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        if (inputArity == 0) {
+            return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        } else {
+            return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        }
+    }
+
+    private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+            public void initialize() throws HyracksDataException {
+                IFrameWriter startOfPipeline;
+                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+                        : null;
+
+                PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
+                        pipelineOutputRecordDescriptor);
+                try {
+                    RuntimeContext rc = new RuntimeContext();
+                    rc.setHyracksContext(ctx);
+                    startOfPipeline = pa.assemblePipeline(writer, rc);
+                } catch (AlgebricksException e) {
+                    throw new HyracksDataException(e);
+                }
+                startOfPipeline.open();
+                startOfPipeline.close();
+            }
+        };
+    }
+
+    private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            private IFrameWriter startOfPipeline;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (startOfPipeline == null) {
+                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+                            : null;
+                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
+                            AlgebricksMetaOperatorDescriptor.this.getOperatorId(), 0);
+                    PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
+                            pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
+                    try {
+                        RuntimeContext rc = new RuntimeContext();
+                        rc.setHyracksContext(ctx);
+                        startOfPipeline = pa.assemblePipeline(writer, rc);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                startOfPipeline.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                startOfPipeline.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                startOfPipeline.close();
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
new file mode 100644
index 0000000..1041438
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PipelineAssembler {
+
+    // array of factories for building the local runtime pipeline
+    private final RecordDescriptor pipelineInputRecordDescriptor;
+    private final RecordDescriptor pipelineOutputRecordDescriptor;
+
+    private final int inputArity;
+    private final int outputArity;
+    private final AlgebricksPipeline pipeline;
+
+    public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+            RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+        this.pipeline = pipeline;
+        this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
+        this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
+        this.inputArity = inputArity;
+        this.outputArity = outputArity;
+    }
+
+    public IFrameWriter assemblePipeline(IFrameWriter writer, RuntimeContext rc) throws AlgebricksException {
+        // plug the operators
+        IFrameWriter start = writer;// this.writer;
+        for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(rc);
+            if (i == pipeline.getRuntimeFactories().length - 1) {
+                if (outputArity == 1) {
+                    newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+                }
+            } else {
+                newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+            }
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
+            } else if (inputArity > 0) {
+                newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
new file mode 100644
index 0000000..c7adf1f
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AlgebricksPipeline pipeline;
+    private final RecordDescriptor inputRecordDesc;
+    private final INullWriterFactory[] nullWriterFactories;
+
+    public SubplanRuntimeFactory(AlgebricksPipeline pipeline, INullWriterFactory[] nullWriterFactories,
+            RecordDescriptor inputRecordDesc, int[] projectionList) {
+        super(projectionList);
+        this.pipeline = pipeline;
+        this.nullWriterFactories = nullWriterFactories;
+        this.inputRecordDesc = inputRecordDesc;
+        if (projectionList != null) {
+            throw new NotImplementedException();
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Subplan { \n");
+        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+            sb.append("  " + f.toString() + ";\n");
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+
+        RecordDescriptor pipelineOutputRecordDescriptor = null;
+
+        final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc,
+                pipelineOutputRecordDescriptor);
+        final INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+        for (int i = 0; i < nullWriterFactories.length; i++) {
+            nullWriters[i] = nullWriterFactories[i].createNullWriter();
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            /**
+             * 
+             * Computes the outer product between a given tuple and the frames
+             * passed.
+             * 
+             */
+            class TupleOuterProduct implements IFrameWriter {
+
+                private boolean smthWasWritten = false;
+                private IHyracksTaskContext hCtx = context.getHyracksContext();
+                private int frameSize = hCtx.getFrameSize();
+                private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+                        pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
+                private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
+
+                @Override
+                public void open() throws HyracksDataException {
+                    smthWasWritten = false;
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ta.reset(buffer);
+                    int nTuple = ta.getTupleCount();
+                    for (int t = 0; t < nTuple; t++) {
+                        if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+                            FrameUtils.flushFrame(frame, writer);
+                            appender.reset(frame, true);
+                            if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+                                throw new IllegalStateException("Could not write frame.");
+                            }
+                        }
+                    }
+                    smthWasWritten = true;
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (!smthWasWritten) {
+                        // the case when we need to write nulls
+                        appendNullsToTuple();
+                        appendToFrameFromTupleBuilder(tb);
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
+
+                private void appendNullsToTuple() throws HyracksDataException {
+                    tb.reset();
+                    int n0 = tRef.getFieldCount();
+                    for (int f = 0; f < n0; f++) {
+                        tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f);
+                    }
+                    DataOutput dos = tb.getDataOutput();
+                    for (int i = 0; i < nullWriters.length; i++) {
+                        nullWriters[i].writeNull(dos);
+                        tb.addFieldEndOffset();
+                    }
+                }
+
+            }
+
+            IFrameWriter endPipe = new TupleOuterProduct();
+
+            NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, context);
+
+            boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppendRef(context);
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    startOfPipeline.writeTuple(buffer, t);
+                    startOfPipeline.open();
+                    startOfPipeline.close();
+                }
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
new file mode 100644
index 0000000..3fa3a50
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+
+public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] sortFields;
+    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private IBinaryComparatorFactory[] comparatorFactories;
+
+    public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+        super(projectionList);
+        // Obs: the projection list is currently ignored.
+        if (projectionList != null) {
+            throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+        }
+        this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputPushRuntime() {
+
+            FrameSorter frameSorter = null;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (frameSorter == null) {
+                    frameSorter = new FrameSorter(context.getHyracksContext(), sortFields, firstKeyNormalizerFactory,
+                            comparatorFactories, outputRecordDesc);
+                }
+                frameSorter.reset();
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                frameSorter.insertFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                frameSorter.sortFrames();
+                frameSorter.flushFrames(writer);
+                writer.close();
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
new file mode 100644
index 0000000..8a89b17
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] outColumns;
+    private IEvaluatorFactory[] evalFactories;
+
+    /**
+     * @param outColumns
+     *            a sorted array of columns into which the result is written to
+     * @param evalFactories
+     * @param projectionList
+     *            an array of columns to be projected
+     */
+
+    public AssignRuntimeFactory(int[] outColumns, IEvaluatorFactory[] evalFactories, int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.evalFactories = evalFactories;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("assign [");
+        for (int i = 0; i < outColumns.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(outColumns[i]);
+        }
+        sb.append("] := [");
+        for (int i = 0; i < evalFactories.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(evalFactories[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+        final int[] projectionToOutColumns = new int[projectionList.length];
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+            private IEvaluator[] eval = new IEvaluator[evalFactories.length];
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    initAccessAppendRef(context);
+                    first = false;
+                    int n = evalFactories.length;
+                    for (int i = 0; i < n; i++) {
+                        try {
+                            eval[i] = evalFactories[i].createEvaluator(evalOutput);
+                        } catch (AlgebricksException ae) {
+                            throw new HyracksDataException(ae);
+                        }
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    produceTuple(tupleBuilder, tAccess, t, tRef);
+                    appendToFrameFromTupleBuilder(tupleBuilder);
+                }
+            }
+
+            private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    FrameTupleReference tupleRef) throws HyracksDataException {
+                tb.reset();
+                for (int f = 0; f < projectionList.length; f++) {
+                    int k = projectionToOutColumns[f];
+                    if (k >= 0) {
+                        evalOutput.reset();
+                        try {
+                            eval[k].evaluate(tupleRef);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        tb.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+                    } else {
+                        tb.addField(accessor, tIndex, projectionList[f]);
+                    }
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..659ba74
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public EmptyTupleSourceRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "ets";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(final RuntimeContext context) {
+        return new AbstractOneInputSourcePushRuntime() {
+
+            private IHyracksTaskContext hCtx = context.getHyracksContext();
+            private ByteBuffer frame = hCtx.allocateFrame();
+            private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+            private FrameTupleAppender appender = new FrameTupleAppender(hCtx.getFrameSize());
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+                appender.reset(frame, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+                FrameUtils.flushFrame(frame, writer);
+                writer.close();
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..899147d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public NestedTupleSourceRuntimeFactory() {
+    }
+
+    @Override
+    public String toString() {
+        return "nts";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(RuntimeContext context) {
+        return new NestedTupleSourceRuntime(context);
+    }
+
+    public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+        public NestedTupleSourceRuntime(RuntimeContext rc) {
+            initAccessAppend(rc);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            writer.open();
+        }
+
+        public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
+            tAccess.reset(inputBuffer);
+            appendTupleToFrame(tIndex);
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            writer.fail();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
new file mode 100644
index 0000000..d4040f2
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] printColumns;
+    private final IPrinterFactory[] printerFactories;
+    private final RecordDescriptor inputRecordDesc;
+
+    public PrinterRuntimeFactory(int[] printColumns, IPrinterFactory[] printerFactories,
+            RecordDescriptor inputRecordDesc) {
+        this.printColumns = printColumns;
+        this.printerFactories = printerFactories;
+        this.inputRecordDesc = inputRecordDesc;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("print [");
+        for (int i = 0; i < printColumns.length; i++) {
+            if (i > 0) {
+                buf.append("; ");
+            }
+            buf.append(printColumns[i]);
+        }
+        buf.append("]");
+        return buf.toString();
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(final RuntimeContext context) {
+        IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
+                inputRecordDesc);
+        return new SinkWriterRuntime(w, context, System.out, inputRecordDesc);
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
new file mode 100644
index 0000000..83976c9
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] outColumns;
+    private IRunningAggregateFunctionFactory[] runningAggregates;
+
+    /**
+     * @param outColumns
+     *            a sorted array of columns into which the result is written to
+     * @param runningAggregates
+     * @param projectionList
+     *            an array of columns to be projected
+     */
+
+    public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateFunctionFactory[] runningAggregates,
+            int[] projectionList) {
+        super(projectionList);
+        this.outColumns = outColumns;
+        this.runningAggregates = runningAggregates;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("running-aggregate [");
+        for (int i = 0; i < outColumns.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(outColumns[i]);
+        }
+        sb.append("] := [");
+        for (int i = 0; i < runningAggregates.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(runningAggregates[i]);
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+        final int[] projectionToOutColumns = new int[projectionList.length];
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+            private IRunningAggregateFunction[] raggs = new IRunningAggregateFunction[runningAggregates.length];
+            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                initAccessAppendRef(context);
+                if (first) {
+                    first = false;
+                    int n = runningAggregates.length;
+                    for (int i = 0; i < n; i++) {
+                        try {
+                            raggs[i] = runningAggregates[i].createRunningAggregateFunction(evalOutput);
+                            raggs[i].init();
+                        } catch (AlgebricksException ae) {
+                            throw new HyracksDataException(ae);
+                        }
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    produceTuple(tupleBuilder, tAccess, t, tRef);
+                    appendToFrameFromTupleBuilder(tupleBuilder);
+                }
+            }
+
+            private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+                    FrameTupleReference tupleRef) throws HyracksDataException {
+                tb.reset();
+                for (int f = 0; f < projectionList.length; f++) {
+                    int k = projectionToOutColumns[f];
+                    if (k >= 0) {
+                        evalOutput.reset();
+                        try {
+                            raggs[k].step(tupleRef);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        tb.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+                    } else {
+                        tb.addField(accessor, tIndex, projectionList[f]);
+                    }
+                }
+            }
+
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
new file mode 100644
index 0000000..56a3f0e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
+
+    private final RuntimeContext context;
+    private final PrintStream printStream;
+    private final IAWriter writer;
+    private RecordDescriptor inputRecordDesc;
+    private FrameTupleAccessor tAccess;
+    private boolean autoClose = false;
+    private boolean first = true;
+
+    public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+            RecordDescriptor inputRecordDesc) {
+        this.writer = writer;
+        this.context = context;
+        this.printStream = printStream;
+        this.inputRecordDesc = inputRecordDesc;
+        this.tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+    }
+
+    public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+            RecordDescriptor inputRecordDesc, boolean autoClose) {
+        this(writer, context, printStream, inputRecordDesc);
+        this.autoClose = autoClose;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (first) {
+            first = false;
+            tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+            try {
+                writer.init();
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            try {
+                writer.printTuple(tAccess, t);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (autoClose) {
+            printStream.close();
+        }
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
new file mode 100644
index 0000000..0641b0c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] fields;
+    private final IPrinterFactory[] printerFactories;
+    private final File outputFile;
+    private final RecordDescriptor inputRecordDesc;
+    private final IAWriterFactory writerFactory;
+
+    public SinkWriterRuntimeFactory(int[] fields, IPrinterFactory[] printerFactories, File outputFile,
+            IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc) {
+        this.fields = fields;
+        this.printerFactories = printerFactories;
+        this.outputFile = outputFile;
+        this.writerFactory = writerFactory;
+        this.inputRecordDesc = inputRecordDesc;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("sink-write " + "[");
+        for (int i = 0; i < fields.length; i++) {
+            if (i > 0) {
+                buf.append("; ");
+            }
+            buf.append(fields[i]);
+        }
+        buf.append("] outputFile");
+        return buf.toString();
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+        PrintStream filePrintStream = null;
+        try {
+            filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+        } catch (FileNotFoundException e) {
+            throw new AlgebricksException(e);
+        }
+        IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
+        return new SinkWriterRuntime(w, context, filePrintStream, inputRecordDesc, true);
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
new file mode 100644
index 0000000..8a12840
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -0,0 +1,98 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory aftterObjectsEvalFactory;
+    private IBinaryIntegerInspector binaryIntegerInspector;
+
+    public StreamDieRuntimeFactory(IEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
+            IBinaryIntegerInspector binaryIntegerInspector) {
+        super(projectionList);
+        this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
+        this.binaryIntegerInspector = binaryIntegerInspector;
+    }
+
+    @Override
+    public String toString() {
+        String s = "stream-die " + aftterObjectsEvalFactory.toString();
+        return s;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private IEvaluator evalAfterObjects;
+            private ArrayBackedValueStorage evalOutput;
+            private int toWrite = -1;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (evalAfterObjects == null) {
+                    initAccessAppendRef(context);
+                    evalOutput = new ArrayBackedValueStorage();
+                    try {
+                        evalAfterObjects = aftterObjectsEvalFactory.createEvaluator(evalOutput);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                if (toWrite < 0) {
+                    toWrite = evaluateInteger(evalAfterObjects, 0);
+                }
+                for (int t = 0; t < nTuple; t++) {
+                    if (toWrite > 0) {
+                        toWrite--;
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    } else {
+                        throw new HyracksDataException("injected failure");
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                super.close();
+            }
+
+            private int evaluateInteger(IEvaluator eval, int tIdx) throws HyracksDataException {
+                tRef.reset(tAccess, tIdx);
+                evalOutput.reset();
+                try {
+                    eval.evaluate(tRef);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                int lim = binaryIntegerInspector.getIntegerValue(evalOutput.getBytes(), 0, evalOutput.getLength());
+                return lim;
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
new file mode 100644
index 0000000..5a23b98
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -0,0 +1,135 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory maxObjectsEvalFactory;
+    private IEvaluatorFactory offsetEvalFactory;
+    private IBinaryIntegerInspector binaryIntegerInspector;
+
+    public StreamLimitRuntimeFactory(IEvaluatorFactory maxObjectsEvalFactory, IEvaluatorFactory offsetEvalFactory,
+            int[] projectionList, IBinaryIntegerInspector binaryIntegerInspector) {
+        super(projectionList);
+        this.maxObjectsEvalFactory = maxObjectsEvalFactory;
+        this.offsetEvalFactory = offsetEvalFactory;
+        this.binaryIntegerInspector = binaryIntegerInspector;
+    }
+
+    @Override
+    public String toString() {
+        String s = "stream-limit " + maxObjectsEvalFactory.toString();
+        if (offsetEvalFactory != null) {
+            return s + ", " + offsetEvalFactory.toString();
+        } else {
+            return s;
+        }
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private IEvaluator evalMaxObjects;
+            private ArrayBackedValueStorage evalOutput;
+            private IEvaluator evalOffset = null;
+            private int toWrite = 0; // how many tuples still to write
+            private int toSkip = 0; // how many tuples still to skip
+            private boolean firstTuple = true;
+            private boolean afterLastTuple = false;
+
+            @Override
+            public void open() throws HyracksDataException {
+                // if (first) {
+                if (evalMaxObjects == null) {
+                    initAccessAppendRef(context);
+                    evalOutput = new ArrayBackedValueStorage();
+                    try {
+                        evalMaxObjects = maxObjectsEvalFactory.createEvaluator(evalOutput);
+                        if (offsetEvalFactory != null) {
+                            evalOffset = offsetEvalFactory.createEvaluator(evalOutput);
+                        }
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+                afterLastTuple = false;
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                if (afterLastTuple) {
+                    // ignore the data
+                    return;
+                }
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                int start = 0;
+                if (nTuple <= toSkip) {
+                    toSkip -= nTuple;
+                    return;
+                } else if (toSkip > 0) {
+                    start = toSkip;
+                    toSkip = 0;
+                }
+                for (int t = start; t < nTuple; t++) {
+                    if (firstTuple) {
+                        firstTuple = false;
+                        toWrite = evaluateInteger(evalMaxObjects, t);
+                        if (evalOffset != null) {
+                            toSkip = evaluateInteger(evalOffset, t);
+                        }
+                    }
+                    if (toSkip > 0) {
+                        toSkip--;
+                    } else if (toWrite > 0) {
+                        toWrite--;
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    } else {
+                        // close();
+                        afterLastTuple = true;
+                        break;
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                // if (!afterLastTuple) {
+                super.close();
+                // }
+            }
+
+            private int evaluateInteger(IEvaluator eval, int tIdx) throws HyracksDataException {
+                tRef.reset(tAccess, tIdx);
+                evalOutput.reset();
+                try {
+                    eval.evaluate(tRef);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                int lim = binaryIntegerInspector.getIntegerValue(evalOutput.getBytes(), 0, evalOutput.getLength());
+                return lim;
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
new file mode 100644
index 0000000..e452059
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public StreamProjectRuntimeFactory(int[] projectionList) {
+        super(projectionList);
+    }
+
+    @Override
+    public String toString() {
+        return "stream-project " + Arrays.toString(projectionList);
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private boolean first = true;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppend(context);
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    appendProjectionToFrame(t, projectionList);
+                }
+
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
new file mode 100644
index 0000000..1e4f4cc
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private IEvaluatorFactory cond;
+
+    private IBinaryBooleanInspector binaryBooleanInspector;
+
+    /**
+     * @param cond
+     * @param projectionList
+     *            if projectionList is null, then no projection is performed
+     */
+    public StreamSelectRuntimeFactory(IEvaluatorFactory cond, int[] projectionList,
+            IBinaryBooleanInspector binaryBooleanInspector) {
+        super(projectionList);
+        this.cond = cond;
+        this.binaryBooleanInspector = binaryBooleanInspector;
+    }
+
+    @Override
+    public String toString() {
+        return "stream-select " + cond.toString();
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private IEvaluator eval;
+            private ArrayBackedValueStorage evalOutput;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (eval == null) {
+                    initAccessAppendRef(context);
+                    evalOutput = new ArrayBackedValueStorage();
+                    try {
+                        eval = cond.createEvaluator(evalOutput);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    evalOutput.reset();
+                    try {
+                        eval.evaluate(tRef);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                    if (binaryBooleanInspector.getBooleanValue(evalOutput.getBytes(), 0, evalOutput.getLength())) {
+                        if (projectionList != null) {
+                            appendProjectionToFrame(t, projectionList);
+                        } else {
+                            appendTupleToFrame(t);
+                        }
+                    }
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
new file mode 100644
index 0000000..cfc21ad
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private String command;
+    private IPrinterFactory[] printerFactories;
+    private char fieldDelimiter;
+    private ITupleParserFactory parserFactory;
+
+    public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
+            ITupleParserFactory parserFactory) {
+        super(null);
+        this.command = command;
+        this.printerFactories = printerFactories;
+        this.fieldDelimiter = fieldDelimiter;
+        this.parserFactory = parserFactory;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+        final IPrinter[] printers = new IPrinter[printerFactories.length];
+        for (int i = 0; i < printerFactories.length; i++) {
+            printers[i] = printerFactories[i].createPrinter();
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            final class ForwardScriptOutput implements Runnable {
+
+                private InputStream inStream;
+                private ITupleParser parser;
+
+                public ForwardScriptOutput(ITupleParser parser, InputStream inStream) {
+                    this.parser = parser;
+                    this.inStream = inStream;
+                }
+
+                @Override
+                public void run() {
+                    try {
+                        parser.parse(inStream, writer);
+                    } catch (HyracksDataException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        try {
+                            inStream.close();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            }
+
+            final class DumpInStreamToPrintStream implements Runnable {
+
+                private BufferedReader reader;
+                private PrintStream printStream;
+
+                public DumpInStreamToPrintStream(InputStream inStream, PrintStream printStream) {
+                    this.reader = new BufferedReader(new InputStreamReader(inStream));
+                    this.printStream = printStream;
+                }
+
+                @Override
+                public void run() {
+                    String s;
+                    try {
+                        while ((s = reader.readLine()) != null) {
+                            printStream.println(s);
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        try {
+                            reader.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        printStream.close();
+                    }
+                }
+
+            }
+
+            private Process process;
+            private PrintStream ps;
+            private boolean first = true;
+            private Thread outputPipe;
+            private Thread dumpStderr;
+
+            @Override
+            public void open() throws HyracksDataException {
+                if (first) {
+                    first = false;
+                    initAccessAppendRef(context);
+                }
+
+                try {
+                    ITupleParser parser = parserFactory.createTupleParser(context.getHyracksContext());
+                    process = Runtime.getRuntime().exec(command);
+                    ps = new PrintStream(process.getOutputStream());
+                    ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
+                    outputPipe = new Thread(fso);
+                    outputPipe.start();
+                    DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
+                            System.err);
+                    dumpStderr = new Thread(disps);
+                    dumpStderr.start();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    for (int i = 0; i < printers.length; i++) {
+                        try {
+                            printers[i].print(buffer.array(), tRef.getFieldStart(i), tRef.getFieldLength(i), ps);
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        ps.print(fieldDelimiter);
+                        if (i == printers.length - 1) {
+                            ps.print('\n');
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                // first close the printer printing to the process
+                ps.close();
+                int ret = 0;
+                // then wait for the process to finish
+
+                try {
+                    ret = process.waitFor();
+                    outputPipe.join();
+                    dumpStderr.join();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+                if (ret != 0) {
+                    throw new HyracksDataException("Process exit value: " + ret);
+                }
+                // close the following operator in the chain
+                super.close();
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
new file mode 100644
index 0000000..e32b023
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int outCol;
+    private final IUnnestingFunctionFactory unnestingFactory;
+    private int outColPos;
+    private final boolean outColIsProjected;
+
+    // Each time step() is called on the aggregate, a new value is written in
+    // its output. One byte is written before that value and is neglected.
+    // By convention, if the aggregate function writes nothing, it means it
+    // produced the last value.
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingFunctionFactory unnestingFactory, int[] projectionList) {
+        super(projectionList);
+        this.outCol = outCol;
+        this.unnestingFactory = unnestingFactory;
+        outColPos = -1;
+        for (int f = 0; f < projectionList.length; f++) {
+            if (projectionList[f] == outCol) {
+                outColPos = f;
+            }
+        }
+        outColIsProjected = outColPos >= 0;
+    }
+
+    @Override
+    public String toString() {
+        return "unnest " + outCol + " <- " + unnestingFactory;
+    }
+
+    @Override
+    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+            throws AlgebricksException {
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private ArrayBackedValueStorage evalOutput;
+            private IUnnestingFunction agg;
+            private ArrayTupleBuilder tupleBuilder;
+
+            @Override
+            public void open() throws HyracksDataException {
+                initAccessAppendRef(context);
+                evalOutput = new ArrayBackedValueStorage();
+                try {
+                    agg = unnestingFactory.createUnnestingFunction(evalOutput);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+                tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                int nTuple = tAccess.getTupleCount();
+                for (int t = 0; t < nTuple; t++) {
+                    tRef.reset(tAccess, t);
+                    try {
+                        agg.init(tRef);
+                        boolean goon = true;
+                        do {
+                            tupleBuilder.reset();
+                            evalOutput.reset();
+                            if (!agg.step()) {
+                                goon = false;
+                            } else {
+                                if (!outColIsProjected) {
+                                    appendProjectionToFrame(t, projectionList);
+                                } else {
+                                    for (int f = 0; f < outColPos; f++) {
+                                        tupleBuilder.addField(tAccess, t, f);
+                                    }
+                                    tupleBuilder.addField(evalOutput.getBytes(), evalOutput.getStartIndex(),
+                                            evalOutput.getLength());
+                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                        tupleBuilder.addField(tAccess, t, f);
+                                    }
+                                }
+                                appendToFrameFromTupleBuilder(tupleBuilder);
+                            }
+                        } while (goon);
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
new file mode 100644
index 0000000..9c53241
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterBasedWriterFactory implements IAWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final PrinterBasedWriterFactory INSTANCE = new PrinterBasedWriterFactory();
+
+    public PrinterBasedWriterFactory() {
+    }
+
+    @Override
+    public IAWriter createWriter(final int[] fields, final PrintStream printStream,
+            final IPrinterFactory[] printerFactories, RecordDescriptor inputRecordDescriptor) {
+        final IPrinter[] printers = new IPrinter[printerFactories.length];
+        for (int i = 0; i < printerFactories.length; i++) {
+            printers[i] = printerFactories[i].createPrinter();
+        }
+
+        return new IAWriter() {
+
+            @Override
+            public void init() throws AlgebricksException {
+                for (int i = 0; i < printers.length; i++) {
+                    printers[i].init();
+                }
+            }
+
+            @Override
+            public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+                for (int i = 0; i < fields.length; i++) {
+                    int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+                            + tAccess.getFieldStartOffset(tIdx, fields[i]);
+                    int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+                    if (i > 0) {
+                        printStream.print("; ");
+                    }
+                    printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
+                }
+                printStream.println();
+            }
+        };
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
new file mode 100644
index 0000000..61f822e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SerializedDataWriterFactory implements IAWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
+            final RecordDescriptor inputRecordDescriptor) {
+        return new IAWriter() {
+
+            @Override
+            public void init() throws AlgebricksException {
+                // dump the SerializerDeserializers to disk
+                try {
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    ObjectOutputStream oos = new ObjectOutputStream(baos);
+                    oos.writeObject(inputRecordDescriptor);
+                    baos.writeTo(ps);
+                    oos.close();
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+                for (int i = 0; i < fields.length; i++) {
+                    int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+                            + tAccess.getFieldStartOffset(tIdx, fields[i]);
+                    int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+                    ps.write(tAccess.getBuffer().array(), fldStart, fldLen);
+                }
+            }
+        };
+    }
+}