Fixed bugs and word count now works
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@199 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
index ec9b303..4607c4c 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -44,9 +44,13 @@
<configuration>
<programs>
<program>
- <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+ <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.MapReduceMain</mainClass>
<name>onlineaggregationclient</name>
</program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountJobGen</mainClass>
+ <name>wcgen</name>
+ </program>
</programs>
<repositoryLayout>flat</repositoryLayout>
<repositoryName>lib</repositoryName>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 57a6aac..f826eef 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -19,11 +19,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.examples.onlineaggregation.DefaultInputSplitProviderFactory;
import edu.uci.ics.hyracks.examples.onlineaggregation.HashPartitioningShuffleConnectorDescriptor;
@@ -44,6 +46,9 @@
@Option(name = "-conf", usage = "Configuration XML File", required = true)
public File conf;
+
+ @Option(name = "-num-maps", usage = "Number of Map tasks (default: 1)")
+ public int numMaps = 1;
}
public static void main(String[] args) throws Exception {
@@ -56,7 +61,7 @@
IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
- JobSpecification job = createJob(conf);
+ JobSpecification job = createJob(options, conf);
long start = System.currentTimeMillis();
UUID jobId = hcc.createJob(options.app, job);
@@ -66,16 +71,20 @@
System.err.println(start + " " + end + " " + (end - start));
}
- private static JobSpecification createJob(Configuration conf) throws Exception {
+ private static JobSpecification createJob(Options options, Configuration conf) throws Exception {
JobSpecification spec = new JobSpecification();
MarshalledWritable<Configuration> mConfig = new MarshalledWritable<Configuration>();
mConfig.set(conf);
+ Job job = new Job(conf);
+
MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
spec, mConfig, new DefaultInputSplitProviderFactory(mConfig));
+ mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
spec, mConfig);
+ reducer.setPartitionConstraint(new PartitionCountConstraint(job.getNumReduceTasks()));
HashPartitioningShuffleConnectorDescriptor conn = new HashPartitioningShuffleConnectorDescriptor(spec, mConfig);
spec.connect(conn, mapper, 0, reducer, 0);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
new file mode 100644
index 0000000..0d2ba77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountMapper;
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountReducer;
+
+public class WordCountJobGen {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "word count");
+ job.setMapperClass(WordCountMapper.class);
+ job.setCombinerClass(WordCountReducer.class);
+ job.setReducerClass(WordCountReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.getConfiguration().writeXml(System.out);
+ }
+}
\ No newline at end of file