Added Binary Preclustered Group Operator. Preclustered and Hash based group operators now use the same aggregator interface. Added a Frame Writing Operator.

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@132 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index c06bd40..67bcd63 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -25,18 +25,20 @@
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.SumStringGroupAggregator;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -60,9 +62,11 @@
         RecordDescriptor desc = new RecordDescriptor(
                 new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
-                        ','), desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
         PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -74,21 +78,28 @@
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                desc2);
         PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
         PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
+        RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
         PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
@@ -130,9 +141,11 @@
         RecordDescriptor desc = new RecordDescriptor(
                 new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
-                        ','), desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
         PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -145,22 +158,29 @@
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                desc2);
         PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
         PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
+        RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
         PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
@@ -202,9 +222,11 @@
         RecordDescriptor desc = new RecordDescriptor(
                 new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
-                        ','), desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
         PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -217,22 +239,29 @@
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+                spec,
+                new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                desc2);
         PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
         PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
+        RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+                        new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
         PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
diff --git a/hyracks-examples/pom.xml b/hyracks-examples/pom.xml
index c3371dc..9596b37 100644
--- a/hyracks-examples/pom.xml
+++ b/hyracks-examples/pom.xml
@@ -13,6 +13,7 @@
 
   <modules>
     <module>tpch-example</module>
+    <module>text-example</module>
     <module>hyracks-integration-tests</module>
   </modules>
 </project>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks-examples/text-example/.project
similarity index 64%
copy from hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks-examples/text-example/.project
index 41b2574..4e057cd 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks-examples/text-example/.project
@@ -1,16 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>tpchhelper</name>
+	<name>text-example</name>
 	<comment></comment>
 	<projects>
 	</projects>
 	<buildSpec>
 		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
 			<name>org.maven.ide.eclipse.maven2Builder</name>
 			<arguments>
 			</arguments>
@@ -18,6 +13,5 @@
 	</buildSpec>
 	<natures>
 		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-		<nature>org.eclipse.jdt.core.javanature</nature>
 	</natures>
 </projectDescription>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
rename from hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
rename to hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 activeProfiles=
 eclipse.preferences.version=1
 fullBuildGoals=process-test-resources
diff --git a/hyracks-examples/text-example/pom.xml b/hyracks-examples/text-example/pom.xml
new file mode 100644
index 0000000..f6b8d8f
--- /dev/null
+++ b/hyracks-examples/text-example/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples</groupId>
+  <artifactId>text-example</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-examples</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <modules>
+    <module>texthelper</module>
+    <module>textclient</module>
+    <module>textapp</module>
+  </modules>
+</project>
diff --git a/hyracks-examples/text-example/textapp/.classpath b/hyracks-examples/text-example/textapp/.classpath
new file mode 100644
index 0000000..3f62785
--- /dev/null
+++ b/hyracks-examples/text-example/textapp/.classpath
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks-examples/text-example/textapp/.project
similarity index 95%
copy from hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks-examples/text-example/textapp/.project
index 41b2574..4f3af14 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks-examples/text-example/textapp/.project
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>tpchhelper</name>
+	<name>textapp</name>
 	<comment></comment>
 	<projects>
 	</projects>
@@ -17,7 +17,7 @@
 		</buildCommand>
 	</buildSpec>
 	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 	</natures>
 </projectDescription>
diff --git a/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..37272d9
--- /dev/null
+++ b/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Tue Sep 28 14:37:42 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.4
+org.eclipse.jdt.core.compiler.compliance=1.4
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.4
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 activeProfiles=
 eclipse.preferences.version=1
 fullBuildGoals=process-test-resources
diff --git a/hyracks-examples/text-example/textapp/pom.xml b/hyracks-examples/text-example/textapp/pom.xml
new file mode 100644
index 0000000..706ae72
--- /dev/null
+++ b/hyracks-examples/text-example/textapp/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  <artifactId>textapp</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>text-example</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/application/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  		<artifactId>texthelper</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml b/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
new file mode 100644
index 0000000..43ace6c
--- /dev/null
+++ b/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
@@ -0,0 +1,13 @@
+<assembly>
+  <id>app-assembly</id>
+  <formats>
+    <format>zip</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/application/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.classpath b/hyracks-examples/text-example/textclient/.classpath
similarity index 100%
copy from hyracks-examples/tpch-example/tpchhelper/.classpath
copy to hyracks-examples/text-example/textclient/.classpath
diff --git a/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks-examples/text-example/textclient/.project
similarity index 95%
rename from hyracks-examples/tpch-example/tpchhelper/.project
rename to hyracks-examples/text-example/textclient/.project
index 41b2574..04307d3 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks-examples/text-example/textclient/.project
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>tpchhelper</name>
+	<name>textclient</name>
 	<comment></comment>
 	<projects>
 	</projects>
@@ -17,7 +17,7 @@
 		</buildCommand>
 	</buildSpec>
 	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 	</natures>
 </projectDescription>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
similarity index 88%
rename from hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
rename to hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
index 4898439..8599738 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 activeProfiles=
 eclipse.preferences.version=1
 fullBuildGoals=process-test-resources
diff --git a/hyracks-examples/text-example/textclient/pom.xml b/hyracks-examples/text-example/textclient/pom.xml
new file mode 100644
index 0000000..c56a6b8
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  <artifactId>textclient</artifactId>
+  <version>0.1.3-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks.examples</groupId>
+    <artifactId>text-example</artifactId>
+    <version>0.1.3-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  		<artifactId>texthelper</artifactId>
+  		<version>0.1.3-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.text.client.WordCountMain</mainClass>
+                  <name>textclient</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
new file mode 100644
index 0000000..96045d2
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.text.client;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
+
+public class WordCountMain {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        public int port = 1099;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+        public String inFileSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-use-hash", usage = "Use Hash based grouping", required = true)
+        public boolean useHash;
+
+        @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+        public int htSize = 8191;
+
+        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+        public int sbSize = 32768;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+        JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
+                options.useHash, options.htSize, options.sbSize);
+
+        long start = System.currentTimeMillis();
+        UUID jobId = hcc.createJob(options.app, job);
+        hcc.start(jobId);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new File(s.substring(idx + 1)));
+        }
+        return fSplits;
+    }
+
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, boolean useHash, int htSize,
+            int sbSize) {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+        RecordDescriptor wordDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new WordTupleParserFactory(), wordDesc);
+        wordScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
+
+        RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        IOperatorDescriptor gBy;
+        if (useHash) {
+            gBy = new HashGroupOperatorDescriptor(spec, new int[] { 0 },
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    groupResultDesc, htSize);
+            gBy.setPartitionConstraint(createPartitionConstraint(outSplits));
+            IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
+        } else {
+            ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, sbSize, new int[] { 0 },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, wordDesc);
+
+            IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
+
+            gBy = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    groupResultDesc);
+            OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+            spec.connect(sortGroupConn, sorter, 0, gBy, 0);
+        }
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        writer.setPartitionConstraint(createPartitionConstraint(outSplits));
+
+        IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(gbyPrinterConn, gBy, 0, writer, 0);
+
+        spec.addRoot(writer);
+        return spec;
+    }
+
+    private static PartitionConstraint createPartitionConstraint(FileSplit[] splits) {
+        LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            lConstraints[i] = new AbsoluteLocationConstraint(splits[i].getNodeName());
+        }
+        return new ExplicitPartitionConstraint(lConstraints);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/tpch-example/tpchhelper/.classpath b/hyracks-examples/text-example/texthelper/.classpath
similarity index 100%
rename from hyracks-examples/tpch-example/tpchhelper/.classpath
rename to hyracks-examples/text-example/texthelper/.classpath
diff --git a/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks-examples/text-example/texthelper/.project
similarity index 95%
copy from hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks-examples/text-example/texthelper/.project
index 41b2574..19ce234 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks-examples/text-example/texthelper/.project
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>tpchhelper</name>
+	<name>texthelper</name>
 	<comment></comment>
 	<projects>
 	</projects>
@@ -17,7 +17,7 @@
 		</buildCommand>
 	</buildSpec>
 	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
 	</natures>
 </projectDescription>
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
similarity index 88%
copy from hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
copy to hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
index 4898439..8599738 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
 activeProfiles=
 eclipse.preferences.version=1
 fullBuildGoals=process-test-resources
diff --git a/hyracks-examples/tpch-example/tpchhelper/pom.xml b/hyracks-examples/text-example/texthelper/pom.xml
similarity index 89%
rename from hyracks-examples/tpch-example/tpchhelper/pom.xml
rename to hyracks-examples/text-example/texthelper/pom.xml
index ed35716..b0dae86 100644
--- a/hyracks-examples/tpch-example/tpchhelper/pom.xml
+++ b/hyracks-examples/text-example/texthelper/pom.xml
@@ -1,12 +1,12 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
-  <artifactId>tpchhelper</artifactId>
+  <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  <artifactId>texthelper</artifactId>
   <version>0.1.3-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
-    <artifactId>tpch-example</artifactId>
+    <artifactId>text-example</artifactId>
     <version>0.1.3-SNAPSHOT</version>
   </parent>
 
diff --git a/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
new file mode 100644
index 0000000..9825fdf
--- /dev/null
+++ b/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -0,0 +1,128 @@
+package edu.uci.ics.hyracks.examples.text;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class WordTupleParserFactory implements ITupleParserFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITupleParser createTupleParser(final IHyracksContext ctx) {
+        return new ITupleParser() {
+            @Override
+            public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+                try {
+                    ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+                    FrameTupleAppender appender = new FrameTupleAppender(ctx);
+                    appender.reset(frame, true);
+                    ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+                    DataOutput dos = tb.getDataOutput();
+
+                    IValueParser utf8StringParser = UTF8StringParserFactory.INSTANCE.createValueParser();
+                    WordCursor cursor = new WordCursor(new InputStreamReader(in));
+                    while (cursor.nextWord()) {
+                        tb.reset();
+                        utf8StringParser.parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+                        tb.addFieldEndOffset();
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            FrameUtils.flushFrame(frame, writer);
+                            appender.reset(frame, true);
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                throw new IllegalStateException();
+                            }
+                        }
+                    }
+                    if (appender.getTupleCount() > 0) {
+                        FrameUtils.flushFrame(frame, writer);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+    private static class WordCursor {
+        private static final int INITIAL_BUFFER_SIZE = 4096;
+        private static final int INCREMENT = 4096;
+
+        private char[] buffer;
+
+        private int start;
+        private int end;
+        private boolean eof;
+
+        private int fStart;
+        private int fEnd;
+        private Reader in;
+
+        public WordCursor(Reader in) {
+            this.in = in;
+            buffer = new char[INITIAL_BUFFER_SIZE];
+            start = 0;
+            end = 0;
+            eof = false;
+        }
+
+        public boolean nextWord() throws IOException {
+            if (eof) {
+                return false;
+            }
+
+            int p = start;
+            while (true) {
+                if (p >= end) {
+                    int s = start;
+                    eof = !readMore();
+                    if (eof) {
+                        return true;
+                    }
+                    p -= (s - start);
+                }
+                char ch = buffer[p];
+                if (Character.isWhitespace(ch)) {
+                    fStart = start;
+                    fEnd = p;
+                    start = p + 1;
+                    return true;
+                }
+                ++p;
+            }
+        }
+
+        private boolean readMore() throws IOException {
+            if (start > 0) {
+                System.arraycopy(buffer, start, buffer, 0, end - start);
+            }
+            end -= start;
+            start = 0;
+
+            if (end == buffer.length) {
+                buffer = Arrays.copyOf(buffer, buffer.length + INCREMENT);
+            }
+
+            int n = in.read(buffer, end, buffer.length - end);
+            if (n < 0) {
+                return false;
+            }
+            end += n;
+            return true;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/tpch-example/pom.xml b/hyracks-examples/tpch-example/pom.xml
index d81d4ce..c30df2b 100644
--- a/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks-examples/tpch-example/pom.xml
@@ -12,7 +12,6 @@
   </parent>
 
   <modules>
-    <module>tpchhelper</module>
     <module>tpchclient</module>
     <module>tpchapp</module>
   </modules>
diff --git a/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks-examples/tpch-example/tpchapp/pom.xml
index d95511d..cc0b0b9 100644
--- a/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -48,11 +48,11 @@
     </plugins>
   </build>
   <dependencies>
-  	<dependency>
-  		<groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
-  		<artifactId>tpchhelper</artifactId>
-  		<version>0.1.3-SNAPSHOT</version>
-  		<scope>compile</scope>
-  	</dependency>
+    <dependency>
+        <groupId>edu.uci.ics.hyracks</groupId>
+        <artifactId>hyracks-dataflow-std</artifactId>
+        <version>0.1.3-SNAPSHOT</version>
+        <scope>compile</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-examples/tpch-example/tpchclient/pom.xml
index f69b7c8..7a49171 100644
--- a/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -17,13 +17,6 @@
   		<version>0.1.3-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
-  	<dependency>
-  		<groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
-  		<artifactId>tpchhelper</artifactId>
-  		<version>0.1.3-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index d90f384..5bc5479 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -24,6 +24,9 @@
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -34,7 +37,6 @@
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.examples.tpch.helper.CountAccumulatingAggregatorFactory;
 
 public class Main {
     public static void main(String[] args) throws Exception {
@@ -119,11 +121,14 @@
         RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 6 },
+        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+                spec,
+                new int[] { 6 },
                 new FieldHashPartitionComputerFactory(new int[] { 6 },
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new CountAccumulatingAggregatorFactory(), groupResultDesc, 16);
+                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                groupResultDesc, 16);
         gby.setPartitionConstraint(new PartitionCountConstraint(4));
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
diff --git a/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java b/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java
deleted file mode 100644
index cbc8a57..0000000
--- a/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.examples.tpch.helper;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-
-public class CountAccumulatingAggregatorFactory implements IAccumulatingAggregatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IAccumulatingAggregator createAggregator(RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) {
-        final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-        return new IAccumulatingAggregator() {
-            private int count;
-
-            @Override
-            public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex)
-                    throws HyracksDataException {
-                tb.reset();
-                tb.addField(accessor, tIndex, 0);
-                tb.addField(IntegerSerializerDeserializer.INSTANCE, count);
-                return appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-            }
-
-            @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                count = 0;
-            }
-
-            @Override
-            public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                ++count;
-            }
-        };
-    }
-}
\ No newline at end of file