Fixed bugs and word count now works
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@199 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
index ec9b303..4607c4c 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -44,9 +44,13 @@
<configuration>
<programs>
<program>
- <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+ <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.MapReduceMain</mainClass>
<name>onlineaggregationclient</name>
</program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountJobGen</mainClass>
+ <name>wcgen</name>
+ </program>
</programs>
<repositoryLayout>flat</repositoryLayout>
<repositoryName>lib</repositoryName>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 57a6aac..f826eef 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -19,11 +19,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.examples.onlineaggregation.DefaultInputSplitProviderFactory;
import edu.uci.ics.hyracks.examples.onlineaggregation.HashPartitioningShuffleConnectorDescriptor;
@@ -44,6 +46,9 @@
@Option(name = "-conf", usage = "Configuration XML File", required = true)
public File conf;
+
+ @Option(name = "-num-maps", usage = "Number of Map tasks (default: 1)")
+ public int numMaps = 1;
}
public static void main(String[] args) throws Exception {
@@ -56,7 +61,7 @@
IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
- JobSpecification job = createJob(conf);
+ JobSpecification job = createJob(options, conf);
long start = System.currentTimeMillis();
UUID jobId = hcc.createJob(options.app, job);
@@ -66,16 +71,20 @@
System.err.println(start + " " + end + " " + (end - start));
}
- private static JobSpecification createJob(Configuration conf) throws Exception {
+ private static JobSpecification createJob(Options options, Configuration conf) throws Exception {
JobSpecification spec = new JobSpecification();
MarshalledWritable<Configuration> mConfig = new MarshalledWritable<Configuration>();
mConfig.set(conf);
+ Job job = new Job(conf);
+
MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
spec, mConfig, new DefaultInputSplitProviderFactory(mConfig));
+ mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
spec, mConfig);
+ reducer.setPartitionConstraint(new PartitionCountConstraint(job.getNumReduceTasks()));
HashPartitioningShuffleConnectorDescriptor conn = new HashPartitioningShuffleConnectorDescriptor(spec, mConfig);
spec.connect(conn, mapper, 0, reducer, 0);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
new file mode 100644
index 0000000..0d2ba77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountMapper;
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountReducer;
+
+public class WordCountJobGen {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "word count");
+ job.setMapperClass(WordCountMapper.class);
+ job.setCombinerClass(WordCountReducer.class);
+ job.setReducerClass(WordCountReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.getConfiguration().writeXml(System.out);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
index 70a0103..08fd195 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/DefaultInputSplitProviderFactory.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class DefaultInputSplitProviderFactory implements IInputSplitProviderFactory {
+ private static final long serialVersionUID = 1L;
+
private MarshalledWritable<Configuration> mConfig;
public DefaultInputSplitProviderFactory(MarshalledWritable<Configuration> mConfig) {
@@ -34,7 +36,7 @@
HadoopHelper helper = new HadoopHelper(mConfig);
final List<InputSplit> splits;
try {
- JobContext jCtx = new JobContext(mConfig.get(), null);
+ JobContext jCtx = helper.createJobContext();
splits = helper.getInputFormat().getSplits(jCtx);
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
index a16c63d..79498bb 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/HadoopHelper.java
@@ -22,10 +22,13 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -51,11 +54,16 @@
public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
this.mConfig = mConfig;
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
config = mConfig.get();
+ config.setClassLoader(getClass().getClassLoader());
job = new Job(config);
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -73,6 +81,26 @@
}
}
+ public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new TaskAttemptContext(config, taId);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ public JobContext createJobContext() {
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(config.getClassLoader());
+ return new JobContext(config, null);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
try {
return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
index 70ab817..7701b25 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
@@ -14,8 +14,10 @@
*/
package edu.uci.ics.hyracks.examples.onlineaggregation;
+import java.io.Serializable;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public interface IInputSplitProviderFactory {
+public interface IInputSplitProviderFactory extends Serializable {
public IInputSplitProvider createInputSplitProvider() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
index 1362fa4..381b297 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -66,7 +67,8 @@
final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
final IInputSplitProvider isp = isProviderFactory.createInputSplitProvider();
- final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(conf, null);
+ final TaskAttemptID taId = new TaskAttemptID("foo", 0, true, 0, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
final int framesLimit = helper.getSortFrameLimit(ctx);
final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
@@ -93,10 +95,6 @@
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
- if (fta.getTupleCount() > 0) {
- runGen.nextFrame(frame);
- fta.reset(frame, true);
- }
}
@Override
@@ -119,6 +117,11 @@
}
public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
+ if (fta.getTupleCount() > 0) {
+ runGen.nextFrame(frame);
+ fta.reset(frame, true);
+ }
+ runGen.close();
IFrameWriter delegatingWriter = new IFrameWriter() {
@Override
public void open() throws HyracksDataException {
@@ -158,10 +161,18 @@
int blockId = isp.getBlockId();
try {
RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(is, taskAttemptContext);
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ recordReader.initialize(is, taskAttemptContext);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
recordWriter.initBlock(blockId);
- Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, null, recordReader,
+ Mapper<K1, V1, K2, V2>.Context mCtx = mapper.new Context(conf, taId, recordReader,
recordWriter, null, null, is);
mapper.run(mCtx);
+ recordReader.close();
recordWriter.sortAndFlushBlock(writer);
writer.flush();
} catch (IOException e) {
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
index 558f4f6..69ec344 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ReducerOperatorDescriptor.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.examples.onlineaggregation;
import java.io.IOException;
@@ -10,9 +24,11 @@
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
@@ -105,7 +121,7 @@
return false;
}
++tIdx;
- if (accessor.getTupleCount() >= tIdx) {
+ if (accessor.getTupleCount() <= tIdx) {
++bPtr;
if (bPtr >= bSize) {
eog = true;
@@ -138,7 +154,8 @@
}
final KVIterator kvi = new KVIterator();
- final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(helper.getConfiguration(), null);
+ final TaskAttemptID taId = new TaskAttemptID("foo", 0, false, 0, 0);
+ final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
final RecordWriter recordWriter;
try {
recordWriter = helper.getOutputFormat().getRecordWriter(taskAttemptContext);
@@ -152,6 +169,8 @@
private List<ByteBuffer> group;
private int bPtr;
private FrameTupleAppender fta;
+ private Counter keyCounter;
+ private Counter valueCounter;
@Override
public void open() throws HyracksDataException {
@@ -161,6 +180,10 @@
bPtr = 0;
group.add(ctx.getResourceManager().allocateFrame());
fta = new FrameTupleAppender(ctx);
+ keyCounter = new Counter() {
+ };
+ valueCounter = new Counter() {
+ };
}
@Override
@@ -213,8 +236,8 @@
private void reduce() throws HyracksDataException {
kvi.reset(group, bPtr + 1);
try {
- Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), null, kvi,
- null, null, recordWriter, null, null,
+ Reducer<K2, V2, K3, V3>.Context rCtx = reducer.new Context(helper.getConfiguration(), taId, kvi,
+ keyCounter, valueCounter, recordWriter, null, null,
(RawComparator<K2>) helper.getRawGroupingComparator(), (Class<K2>) helper.getJob()
.getMapOutputKeyClass(), (Class<V2>) helper.getJob().getMapOutputValueClass());
reducer.run(rCtx);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
index e03d0c9..56b4b28 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/ShuffleFrameReader.java
@@ -193,7 +193,11 @@
buffer.limit(buffer.capacity());
buffer.position(0);
try {
- channel.write(buffer);
+ int remaining = buffer.remaining();
+ while (remaining > 0) {
+ int wLen = channel.write(buffer);
+ remaining -= wLen;
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java
new file mode 100644
index 0000000..17383ba
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+ private static final IntWritable ONE = new IntWritable(1);
+
+ private Text word = new Text();
+
+ @Override
+ public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
+ StringTokenizer tok = new StringTokenizer(value.toString());
+ while (tok.hasMoreTokens()) {
+ word.set(tok.nextToken());
+ ctx.write(word, ONE);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java
new file mode 100644
index 0000000..eb4e926
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/sample/wordcount/WordCountReducer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ private IntWritable result = new IntWritable();
+
+ @Override
+ public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ ctx.write(key, result);
+ }
+}
\ No newline at end of file