Fixed bugs and word count now works

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@199 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
index ec9b303..4607c4c 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -44,9 +44,13 @@
             <configuration>
               <programs>
                 <program>
-                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.MapReduceMain</mainClass>
                   <name>onlineaggregationclient</name>
                 </program>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountJobGen</mainClass>
+                  <name>wcgen</name>
+                </program>
               </programs>
               <repositoryLayout>flat</repositoryLayout>
               <repositoryName>lib</repositoryName>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 57a6aac..f826eef 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -19,11 +19,13 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.examples.onlineaggregation.DefaultInputSplitProviderFactory;
 import edu.uci.ics.hyracks.examples.onlineaggregation.HashPartitioningShuffleConnectorDescriptor;
@@ -44,6 +46,9 @@
 
         @Option(name = "-conf", usage = "Configuration XML File", required = true)
         public File conf;
+
+        @Option(name = "-num-maps", usage = "Number of Map tasks (default: 1)")
+        public int numMaps = 1;
     }
 
     public static void main(String[] args) throws Exception {
@@ -56,7 +61,7 @@
 
         IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
 
-        JobSpecification job = createJob(conf);
+        JobSpecification job = createJob(options, conf);
 
         long start = System.currentTimeMillis();
         UUID jobId = hcc.createJob(options.app, job);
@@ -66,16 +71,20 @@
         System.err.println(start + " " + end + " " + (end - start));
     }
 
-    private static JobSpecification createJob(Configuration conf) throws Exception {
+    private static JobSpecification createJob(Options options, Configuration conf) throws Exception {
         JobSpecification spec = new JobSpecification();
 
         MarshalledWritable<Configuration> mConfig = new MarshalledWritable<Configuration>();
         mConfig.set(conf);
+        Job job = new Job(conf);
+
         MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
                 spec, mConfig, new DefaultInputSplitProviderFactory(mConfig));
+        mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
 
         ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
                 spec, mConfig);
+        reducer.setPartitionConstraint(new PartitionCountConstraint(job.getNumReduceTasks()));
 
         HashPartitioningShuffleConnectorDescriptor conn = new HashPartitioningShuffleConnectorDescriptor(spec, mConfig);
         spec.connect(conn, mapper, 0, reducer, 0);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
new file mode 100644
index 0000000..0d2ba77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountMapper;
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountReducer;
+
+public class WordCountJobGen {
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "word count");
+        job.setMapperClass(WordCountMapper.class);
+        job.setCombinerClass(WordCountReducer.class);
+        job.setReducerClass(WordCountReducer.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+        FileInputFormat.addInputPath(job, new Path(args[0]));
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+        job.getConfiguration().writeXml(System.out);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
index 70a0103..08fd195 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class DefaultInputSplitProviderFactory implements IInputSplitProviderFactory {
+    private static final long serialVersionUID = 1L;
+
     private MarshalledWritable<Configuration> mConfig;
 
     public DefaultInputSplitProviderFactory(MarshalledWritable<Configuration> mConfig) {
@@ -34,7 +36,7 @@
         HadoopHelper helper = new HadoopHelper(mConfig);
         final List<InputSplit> splits;
         try {
-            JobContext jCtx = new JobContext(mConfig.get(), null);
+            JobContext jCtx = helper.createJobContext();
             splits = helper.getInputFormat().getSplits(jCtx);
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
index a16c63d..79498bb 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
@@ -22,10 +22,13 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -51,11 +54,16 @@
 
     public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
         this.mConfig = mConfig;
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
         try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
             config = mConfig.get();
+            config.setClassLoader(getClass().getClassLoader());
             job = new Job(config);
         } catch (Exception e) {
             throw new HyracksDataException(e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
         }
     }
 
@@ -73,6 +81,26 @@
         }
     }
 
+    public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new TaskAttemptContext(config, taId);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    public JobContext createJobContext() {
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(config.getClassLoader());
+            return new JobContext(config, null);
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
     public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
         try {
             return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
index 70ab817..7701b25 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
@@ -14,8 +14,10 @@
  */
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
+import java.io.Serializable;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public interface IInputSplitProviderFactory {
+public interface IInputSplitProviderFactory extends Serializable {
     public IInputSplitProvider createInputSplitProvider() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
index 1362fa4..381b297 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -66,7 +67,8 @@
         final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
         final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
         final IInputSplitProvider isp = isProviderFactory.createInputSplitProvider();
-        final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(conf, null);
+        final TaskAttemptID taId = new TaskAttemptID("foo", 0, true, 0, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
 
         final int framesLimit = helper.getSortFrameLimit(ctx);
         final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
@@ -93,10 +95,6 @@
 
             @Override
             public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
-                if (fta.getTupleCount() > 0) {
-                    runGen.nextFrame(frame);
-                    fta.reset(frame, true);
-                }
             }
 
             @Override
@@ -119,6 +117,11 @@
             }
 
             public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+                if (fta.getTupleCount() > 0) {
+                    runGen.nextFrame(frame);
+                    fta.reset(frame, true);
+                }
+                runGen.close();
                 IFrameWriter delegatingWriter = new IFrameWriter() {
                     @Override
                     public void open() throws HyracksDataException {
@@ -158,10 +161,18 @@
                         int blockId = isp.getBlockId();
                         try {
                             RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(is, taskAttemptContext);
+                            ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+                            try {
+                                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                                recordReader.initialize(is, taskAttemptContext);
+                            } finally {
+                                Thread.currentThread().setContextClassLoader(ctxCL);
+                            }
                             recordWriter.initBlock(blockId);
-                            Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, null, recordReader,
+                            Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
                                     recordWriter, null, null, is);
                             mapper.run(mCtx);
+                            recordReader.close();
                             recordWriter.sortAndFlushBlock(writer);
                             writer.flush();
                         } catch (IOException e) {
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
index 558f4f6..69ec344 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
 import java.io.IOException;
@@ -10,9 +24,11 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -105,7 +121,7 @@
                         return false;
                     }
                     ++tIdx;
-                    if (accessor.getTupleCount() >= tIdx) {
+                    if (accessor.getTupleCount() <= tIdx) {
                         ++bPtr;
                         if (bPtr >= bSize) {
                             eog = true;
@@ -138,7 +154,8 @@
         }
 
         final KVIterator kvi = new KVIterator();
-        final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(helper.getConfiguration(), null);
+        final TaskAttemptID taId = new TaskAttemptID("foo", 0, false, 0, 0);
+        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
         final RecordWriter recordWriter;
         try {
             recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
@@ -152,6 +169,8 @@
             private List<ByteBuffer> group;
             private int bPtr;
             private FrameTupleAppender fta;
+            private Counter keyCounter;
+            private Counter valueCounter;
 
             @Override
             public void open() throws HyracksDataException {
@@ -161,6 +180,10 @@
                 bPtr = 0;
                 group.add(ctx.getResourceManager().allocateFrame());
                 fta = new FrameTupleAppender(ctx);
+                keyCounter = new Counter() {
+                };
+                valueCounter = new Counter() {
+                };
             }
 
             @Override
@@ -213,8 +236,8 @@
             private void reduce() throws HyracksDataException {
                 kvi.reset(group, bPtr + 1);
                 try {
-                    Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), null, kvi,
-                            null, null, recordWriter, null, null,
+                    Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+                            keyCounter, valueCounter, recordWriter, null, null,
                             (RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
                                     .getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
                     reducer.run(rCtx);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
index e03d0c9..56b4b28 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
@@ -193,7 +193,11 @@
             buffer.limit(buffer.capacity());
             buffer.position(0);
             try {
-                channel.write(buffer);
+                int remaining = buffer.remaining();
+                while (remaining > 0) {
+                    int wLen = channel.write(buffer);
+                    remaining -= wLen;
+                }
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java
new file mode 100644
index 0000000..17383ba
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+    private static final IntWritable ONE = new IntWritable(1);
+
+    private Text word = new Text();
+
+    @Override
+    public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
+        StringTokenizer tok = new StringTokenizer(value.toString());
+        while (tok.hasMoreTokens()) {
+            word.set(tok.nextToken());
+            ctx.write(word, ONE);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java
new file mode 100644
index 0000000..eb4e926
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
+        int sum = 0;
+        for (IntWritable val : values) {
+            sum += val.get();
+        }
+        result.set(sum);
+        ctx.write(key, result);
+    }
+}
\ No newline at end of file