Added onlineaggregation example

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@195 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/.project b/hyracks-examples/onlineaggregation-example/.project
new file mode 100644
index 0000000..04d3146
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>onlineaggregation-example</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath
new file mode 100644
index 0000000..3f62785
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.classpath
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project
new file mode 100644
index 0000000..4c98f7a
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>onlineaggregationapp</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..5ee0b93
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.4
+org.eclipse.jdt.core.compiler.compliance=1.4
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.4
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml
new file mode 100644
index 0000000..6e95f3c
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+  <artifactId>onlineaggregationapp</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>onlineaggregation-example</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/application/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+  		<artifactId>onlineaggregationhelper</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml
new file mode 100644
index 0000000..43ace6c
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationapp/src/main/assembly/app-assembly.xml
@@ -0,0 +1,13 @@
+<assembly>
+  <id>app-assembly</id>
+  <formats>
+    <format>zip</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/application/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project
new file mode 100644
index 0000000..38c4876
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>onlineaggregationclient</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..96d3d9f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
new file mode 100644
index 0000000..ec9b303
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+  <artifactId>onlineaggregationclient</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>onlineaggregation-example</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+  		<artifactId>onlineaggregationhelper</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+                  <name>onlineaggregationclient</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project
new file mode 100644
index 0000000..832bfd9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>onlineaggregationhelper</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..96d3d9f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Thu Nov 11 13:12:17 PST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..53f6f77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Thu Nov 11 13:12:17 PST 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml
new file mode 100644
index 0000000..3c8dab4
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/pom.xml
@@ -0,0 +1,47 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.onlineaggregation</groupId>
+  <artifactId>onlineaggregationhelper</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>onlineaggregation-example</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-api</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-hadoop</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
new file mode 100644
index 0000000..a16c63d
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+
+public class HadoopHelper {
+    public static final int KEY_FIELD_INDEX = 0;
+    public static final int VALUE_FIELD_INDEX = 1;
+    public static final int BLOCKID_FIELD_INDEX = 2;
+    private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
+
+    private MarshalledWritable<Configuration> mConfig;
+    private Configuration config;
+    private Job job;
+
+    public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.mConfig = mConfig;
+        try {
+            config = mConfig.get();
+            job = new Job(config);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
+        try {
+            return new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputKeyClass()),
+                            DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                                    .getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
+        try {
+            return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
+        try {
+            return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        } catch (InstantiationException e) {
+            throw new HyracksDataException(e);
+        } catch (IllegalAccessException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
+        try {
+            return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public IBinaryComparatorFactory[] getSortComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getSortComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
+        WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
+                .getGroupingComparator().getClass());
+
+        return new IBinaryComparatorFactory[] { comparatorFactory };
+    }
+
+    public RawComparator<?> getRawGroupingComparator() {
+        return job.getGroupingComparator();
+    }
+
+    public int getSortFrameLimit(IHyracksContext ctx) {
+        int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
+        return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
+    }
+
+    public Job getJob() {
+        return job;
+    }
+
+    public MarshalledWritable<Configuration> getMarshalledConfiguration() {
+        return mConfig;
+    }
+
+    public Configuration getConfiguration() {
+        return config;
+    }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
+        int nReducers = job.getNumReduceTasks();
+        try {
+            return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
+                    (Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
+                    (ISerializerDeserializer<Writable>) DatatypeHelper
+                            .createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public int[] getSortFields() {
+        return KEY_SORT_FIELDS;
+    }
+
+    public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
+        return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputKeyClass());
+    }
+
+    public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
+        return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
+                .getMapOutputValueClass());
+    }
+
+    public FileSystem getFilesystem() throws HyracksDataException {
+        try {
+            return FileSystem.get(config);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
+        try {
+            return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java
new file mode 100644
index 0000000..f32a9f7
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopTools.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+public class HadoopTools {
+    public static Object newInstance(String className) throws ClassNotFoundException, InstantiationException,
+            IllegalAccessException {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(HadoopTools.class.getClassLoader());
+            Class<?> clazz = Class.forName(className, true, HadoopTools.class.getClassLoader());
+            return newInstance(clazz);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public static Object newInstance(Class<?> clazz) throws InstantiationException, IllegalAccessException {
+        return clazz.newInstance();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java
new file mode 100644
index 0000000..b36b371
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HashPartitioningShuffleConnectorDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.HashDataWriter;
+
+public class HashPartitioningShuffleConnectorDescriptor extends AbstractConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final MarshalledWritable<Configuration> mConfig;
+
+    public HashPartitioningShuffleConnectorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+        super(spec);
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        HadoopHelper helper = new HadoopHelper(mConfig);
+        ITuplePartitionComputerFactory tpcf = helper.getTuplePartitionComputer();
+        return new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+    }
+
+    @Override
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        return new ShuffleFrameReader(ctx, demux, mConfig);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
new file mode 100644
index 0000000..6c8c8e9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public interface IInputSplitProvider {
+    public boolean next();
+
+    public InputSplit getInputSplit();
+
+    public int getBlockId();
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
new file mode 100644
index 0000000..dad5ec5
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+public interface IInputSplitProviderFactory {
+    public IInputSplitProvider createInputSplitProvider();
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
new file mode 100644
index 0000000..10f343f
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+
+public class MapperOperatorDescriptor<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+        extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final MarshalledWritable<Configuration> config;
+    private final IInputSplitProviderFactory isProviderFactory;
+
+    public MapperOperatorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> config,
+            IInputSplitProviderFactory isProviderFactory) throws HyracksDataException {
+        super(spec, 0, 1);
+        this.config = config;
+        this.isProviderFactory = isProviderFactory;
+        HadoopHelper helper = new HadoopHelper(config);
+        recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        HadoopHelper helper = new HadoopHelper(config);
+        final Configuration conf = helper.getConfiguration();
+        final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
+        final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
+        final IInputSplitProvider isp = isProviderFactory.createInputSplitProvider();
+        final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(conf, null);
+
+        final int framesLimit = helper.getSortFrameLimit(ctx);
+        final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
+
+        class SortingRecordWriter extends RecordWriter<K2, V2> {
+            private final ArrayTupleBuilder tb;
+            private final ByteBuffer frame;
+            private final FrameTupleAppender fta;
+            private ExternalSortRunGenerator runGen;
+            private int blockId;
+
+            public SortingRecordWriter() throws HyracksDataException {
+                tb = new ArrayTupleBuilder(3);
+                frame = ctx.getResourceManager().allocateFrame();
+                fta = new FrameTupleAppender(ctx);
+                fta.reset(frame, true);
+            }
+
+            public void initBlock(int blockId) {
+                runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
+                        recordDescriptors[0], framesLimit);
+                this.blockId = blockId;
+            }
+
+            @Override
+            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+                if (fta.getTupleCount() > 0) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                }
+            }
+
+            @Override
+            public void write(K2 key, V2 value) throws IOException, InterruptedException {
+                DataOutput dos = tb.getDataOutput();
+                tb.reset();
+                key.write(dos);
+                tb.addFieldEndOffset();
+                value.write(dos);
+                tb.addFieldEndOffset();
+                dos.writeInt(blockId);
+                tb.addFieldEndOffset();
+                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                    if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+
+            public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+                IFrameWriter delegatingWriter = new IFrameWriter() {
+                    @Override
+                    public void open() throws HyracksDataException {
+                        // do nothing
+                    }
+
+                    @Override
+                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                        writer.nextFrame(buffer);
+                    }
+
+                    @Override
+                    public void flush() throws HyracksDataException {
+                        writer.flush();
+                    }
+
+                    @Override
+                    public void close() throws HyracksDataException {
+                        // do nothing
+                    }
+                };
+                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getFrameSorter(),
+                        runGen.getRuns(), new int[] { 0 }, comparatorFactories, recordDescriptors[0], framesLimit,
+                        delegatingWriter);
+                merger.process();
+            }
+        }
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                writer.open();
+                try {
+                    SortingRecordWriter recordWriter = new SortingRecordWriter();
+                    while (isp.next()) {
+                        InputSplit is = isp.getInputSplit();
+                        int blockId = isp.getBlockId();
+                        try {
+                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(is, taskAttemptContext);
+                            recordWriter.initBlock(blockId);
+                            Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, null, recordReader,
+                                    recordWriter, null, null, is);
+                            mapper.run(mCtx);
+                            recordWriter.sortAndFlushBlock(writer);
+                            writer.flush();
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        } catch (InterruptedException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java
new file mode 100644
index 0000000..4455c2b
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MarshalledWritable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 University of California, Irvine
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.Writable;
+
+public class MarshalledWritable<T extends Writable> implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private byte[] bytes;
+
+    public MarshalledWritable() {
+    }
+
+    public void set(T o) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeUTF(o.getClass().getName());
+        o.write(dos);
+        dos.close();
+        bytes = baos.toByteArray();
+    }
+
+    public T get() throws Exception {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bais);
+        String className = dis.readUTF();
+        T o = (T) HadoopTools.newInstance(className);
+        o.readFields(dis);
+        return o;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
new file mode 100644
index 0000000..8bedd02
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
@@ -0,0 +1,253 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private MarshalledWritable<Configuration> mConfig;
+
+    public ReducerOperatorDescriptor(JobSpecification spec, MarshalledWritable<Configuration> mConfig) {
+        super(spec, 1, 0);
+        this.mConfig = mConfig;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        final HadoopHelper helper = new HadoopHelper(mConfig);
+        final Reducer<K2, V2, K3, V3> reducer = helper.getReducer();
+        final RecordDescriptor recordDescriptor = helper.getMapOutputRecordDescriptor();
+        final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, recordDescriptor);
+        final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx, recordDescriptor);
+        final ByteBuffer copyFrame = ctx.getResourceManager().allocateFrame();
+        accessor1.reset(copyFrame);
+        final int[] groupFields = helper.getSortFields();
+        IBinaryComparatorFactory[] groupingComparators = helper.getGroupingComparatorFactories();
+        final IBinaryComparator[] comparators = new IBinaryComparator[groupingComparators.length];
+        for (int i = 0; i < comparators.length; ++i) {
+            comparators[i] = groupingComparators[i].createBinaryComparator();
+        }
+
+        class KVIterator implements RawKeyValueIterator {
+            private FrameTupleAccessor accessor;
+            private DataInputBuffer kBuffer;
+            private DataInputBuffer vBuffer;
+            private List<ByteBuffer> buffers;
+            private int bSize;
+            private int bPtr;
+            private int tIdx;
+            private boolean eog;
+
+            public KVIterator() {
+                accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+                kBuffer = new DataInputBuffer();
+                vBuffer = new DataInputBuffer();
+            }
+
+            void reset(List<ByteBuffer> buffers, int bSize) {
+                this.buffers = buffers;
+                this.bSize = bSize;
+                bPtr = 0;
+                tIdx = 0;
+                eog = false;
+                if (bSize > 0) {
+                    accessor.reset(buffers.get(0));
+                    tIdx = -1;
+                } else {
+                    eog = true;
+                }
+            }
+
+            @Override
+            public DataInputBuffer getKey() throws IOException {
+                return kBuffer;
+            }
+
+            @Override
+            public DataInputBuffer getValue() throws IOException {
+                return vBuffer;
+            }
+
+            @Override
+            public boolean next() throws IOException {
+                while (true) {
+                    if (eog) {
+                        return false;
+                    }
+                    ++tIdx;
+                    if (accessor.getTupleCount() >= tIdx) {
+                        ++bPtr;
+                        if (bPtr >= bSize) {
+                            eog = true;
+                            continue;
+                        }
+                        tIdx = -1;
+                        accessor.reset(buffers.get(bPtr));
+                        continue;
+                    }
+                    kBuffer.reset(accessor.getBuffer().array(),
+                            FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.KEY_FIELD_INDEX),
+                            accessor.getFieldLength(tIdx, helper.KEY_FIELD_INDEX));
+                    vBuffer.reset(accessor.getBuffer().array(),
+                            FrameUtils.getAbsoluteFieldStartOffset(accessor, tIdx, helper.VALUE_FIELD_INDEX),
+                            accessor.getFieldLength(tIdx, helper.VALUE_FIELD_INDEX));
+                    break;
+                }
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Progress getProgress() {
+                return null;
+            }
+        }
+
+        final KVIterator kvi = new KVIterator();
+        final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(helper.getConfiguration(), null);
+        final RecordWriter<K3, V3> recordWriter;
+        try {
+            recordWriter = (RecordWriter<K3, V3>) helper.getOutputFormat().getRecordWriter(taskAttemptContext);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            private boolean first;
+            private boolean groupStarted;
+            private List<ByteBuffer> group;
+            private int bPtr;
+            private FrameTupleAppender fta;
+
+            @Override
+            public void open() throws HyracksDataException {
+                first = true;
+                groupStarted = false;
+                group = new ArrayList<ByteBuffer>();
+                bPtr = 0;
+                group.add(ctx.getResourceManager().allocateFrame());
+                fta = new FrameTupleAppender(ctx);
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                accessor0.reset(buffer);
+                int nTuples = accessor0.getTupleCount();
+                for (int i = 0; i < nTuples; ++i) {
+                    if (first) {
+                        groupInit();
+                        first = false;
+                    } else {
+                        if (i == 0) {
+                            switchGroupIfRequired(accessor1, accessor1.getTupleCount() - 1, accessor0, i);
+                        } else {
+                            switchGroupIfRequired(accessor0, i - 1, accessor0, i);
+                        }
+                    }
+                    accumulate(accessor0, i);
+                }
+                FrameUtils.copy(buffer, copyFrame);
+            }
+
+            private void accumulate(FrameTupleAccessor accessor, int tIndex) {
+                if (!fta.append(accessor, tIndex)) {
+                    ++bPtr;
+                    if (group.size() <= bPtr) {
+                        group.add(ctx.getResourceManager().allocateFrame());
+                    }
+                    fta.reset(group.get(bPtr), true);
+                    if (!fta.append(accessor, tIndex)) {
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+
+            private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+                    FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+                if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+                    reduce();
+                    groupInit();
+                }
+            }
+
+            private void groupInit() {
+                groupStarted = true;
+                bPtr = 0;
+                fta.reset(group.get(0), true);
+            }
+
+            private void reduce() throws HyracksDataException {
+                kvi.reset(group, bPtr + 1);
+                try {
+                    Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), null, kvi,
+                            null, null, recordWriter, null, null,
+                            (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
+                                    .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
+                    reducer.run(rCtx);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+                groupStarted = false;
+            }
+
+            private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+                for (int i = 0; i < comparators.length; ++i) {
+                    int fIdx = groupFields[i];
+                    int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
+                            + a1.getFieldStartOffset(t1Idx, fIdx);
+                    int l1 = a1.getFieldLength(t1Idx, fIdx);
+                    int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
+                            + a2.getFieldStartOffset(t2Idx, fIdx);
+                    int l2 = a2.getFieldLength(t2Idx, fIdx);
+                    if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (groupStarted) {
+                    reduce();
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
new file mode 100644
index 0000000..e03d0c9
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunFileWriter;
+
+public class ShuffleFrameReader implements IFrameReader {
+    private final IHyracksContext ctx;
+    private final IConnectionDemultiplexer demux;
+    private final HadoopHelper helper;
+    private final RecordDescriptor recordDescriptor;
+    private List<File> runs;
+    private Map<File, Integer> file2BlockIdMap;
+    private Map<Integer, File> blockId2FileMap;
+    private int lastReadSender;
+    private RunFileReader reader;
+
+    public ShuffleFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux,
+            MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
+        this.ctx = ctx;
+        this.demux = demux;
+        helper = new HadoopHelper(mConfig);
+        this.recordDescriptor = helper.getMapOutputRecordDescriptor();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        runs = new LinkedList<File>();
+        file2BlockIdMap = new HashMap<File, Integer>();
+        blockId2FileMap = new HashMap<Integer, File>();
+        int nSenders = demux.getSenderCount();
+        RunInfo[] infos = new RunInfo[nSenders];
+        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptor);
+        while (true) {
+            IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+            lastReadSender = (Integer) entry.getAttachment();
+            ByteBuffer netBuffer = entry.getReadBuffer();
+            accessor.reset(netBuffer);
+            int tupleCount = accessor.getTupleCount();
+            if (tupleCount == 0) {
+                int openEntries = demux.closeEntry(lastReadSender);
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+                if (openEntries == 0) {
+                    break;
+                }
+            } else {
+                RunInfo info = infos[lastReadSender];
+                int nTuples = accessor.getTupleCount();
+                for (int i = 0; i < nTuples; ++i) {
+                    int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                            FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
+                    if (info == null) {
+                        info = new RunInfo();
+                        info.reset(tBlockId);
+                        infos[lastReadSender] = info;
+                    } else if (info.blockId != tBlockId) {
+                        info.close();
+                        info.reset(tBlockId);
+                    }
+                    info.write(accessor, i);
+                }
+                netBuffer.clear();
+                demux.unreadyEntry(lastReadSender);
+            }
+        }
+        for (int i = 0; i < infos.length; ++i) {
+            RunInfo info = infos[i];
+            if (info != null) {
+                info.close();
+            }
+        }
+        infos = null;
+
+        File outFile;
+        try {
+            outFile = ctx.getResourceManager().createFile(ShuffleFrameReader.class.getName(), ".run");
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        int framesLimit = helper.getSortFrameLimit(ctx);
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
+                helper.getSortComparatorFactories(), recordDescriptor, framesLimit, new RunFileWriter(outFile));
+        merger.process();
+
+        try {
+            reader = new RunFileReader(outFile);
+        } catch (FileNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+        reader.open();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        return reader.nextFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        reader.close();
+    }
+
+    private class RunInfo {
+        private final ByteBuffer buffer;
+        private final FrameTupleAppender fta;
+
+        private File file;
+        private RandomAccessFile raf;
+        private FileChannel channel;
+        private int blockId;
+
+        public RunInfo() {
+            buffer = ctx.getResourceManager().allocateFrame();
+            fta = new FrameTupleAppender(ctx);
+        }
+
+        public void reset(int blockId) throws HyracksDataException {
+            this.blockId = blockId;
+            fta.reset(buffer, true);
+            try {
+                file = ctx.getResourceManager().createFile(ShuffleFrameReader.class.getName(), ".run");
+                raf = new RandomAccessFile(file, "rw");
+                channel = raf.getChannel();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
+            if (!fta.append(accessor, tIdx)) {
+                flush();
+                if (!fta.append(accessor, tIdx)) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+
+        public void close() throws HyracksDataException {
+            flush();
+            try {
+                channel.close();
+                raf.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            runs.add(file);
+            file2BlockIdMap.put(file, blockId);
+            blockId2FileMap.put(blockId, file);
+        }
+
+        private void flush() throws HyracksDataException {
+            if (fta.getTupleCount() <= 0) {
+                return;
+            }
+            buffer.limit(buffer.capacity());
+            buffer.position(0);
+            try {
+                channel.write(buffer);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            fta.reset(buffer, true);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/pom.xml b/hyracks-examples/onlineaggregation-example/pom.xml
new file mode 100644
index 0000000..f14e5ac
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples</groupId>
+  <artifactId>onlineaggregation-example</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-examples</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <modules>
+    <module>onlineaggregationhelper</module>
+    <module>onlineaggregationclient</module>
+    <module>onlineaggregationapp</module>
+  </modules>
+</project>
diff --git a/hyracks-examples/pom.xml b/hyracks-examples/pom.xml
index f8e8820..189dfc6 100644
--- a/hyracks-examples/pom.xml
+++ b/hyracks-examples/pom.xml
@@ -16,5 +16,6 @@
     <module>text-example</module>
     <module>btree-example</module>
     <module>hyracks-integration-tests</module>
+    <module>onlineaggregation-example</module>
   </modules>
 </project>