Fixed bugs and word count now works

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@199 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
index ec9b303..4607c4c 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/pom.xml
@@ -44,9 +44,13 @@
             <configuration>
               <programs>
                 <program>
-                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountMain</mainClass>
+                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.MapReduceMain</mainClass>
                   <name>onlineaggregationclient</name>
                 </program>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.examples.onlineaggregation.client.WordCountJobGen</mainClass>
+                  <name>wcgen</name>
+                </program>
               </programs>
               <repositoryLayout>flat</repositoryLayout>
               <repositoryName>lib</repositoryName>
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 57a6aac..f826eef 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -19,11 +19,13 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.examples.onlineaggregation.DefaultInputSplitProviderFactory;
 import edu.uci.ics.hyracks.examples.onlineaggregation.HashPartitioningShuffleConnectorDescriptor;
@@ -44,6 +46,9 @@
 
         @Option(name = "-conf", usage = "Configuration XML File", required = true)
         public File conf;
+
+        @Option(name = "-num-maps", usage = "Number of Map tasks (default: 1)")
+        public int numMaps = 1;
     }
 
     public static void main(String[] args) throws Exception {
@@ -56,7 +61,7 @@
 
         IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
 
-        JobSpecification job = createJob(conf);
+        JobSpecification job = createJob(options, conf);
 
         long start = System.currentTimeMillis();
         UUID jobId = hcc.createJob(options.app, job);
@@ -66,16 +71,20 @@
         System.err.println(start + " " + end + " " + (end - start));
     }
 
-    private static JobSpecification createJob(Configuration conf) throws Exception {
+    private static JobSpecification createJob(Options options, Configuration conf) throws Exception {
         JobSpecification spec = new JobSpecification();
 
         MarshalledWritable<Configuration> mConfig = new MarshalledWritable<Configuration>();
         mConfig.set(conf);
+        Job job = new Job(conf);
+
         MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
                 spec, mConfig, new DefaultInputSplitProviderFactory(mConfig));
+        mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
 
         ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
                 spec, mConfig);
+        reducer.setPartitionConstraint(new PartitionCountConstraint(job.getNumReduceTasks()));
 
         HashPartitioningShuffleConnectorDescriptor conn = new HashPartitioningShuffleConnectorDescriptor(spec, mConfig);
         spec.connect(conn, mapper, 0, reducer, 0);
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
new file mode 100644
index 0000000..0d2ba77
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/WordCountJobGen.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.onlineaggregation.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountMapper;
+import edu.uci.ics.hyracks.examples.onlineaggregation.sample.wordcount.WordCountReducer;
+
+public class WordCountJobGen {
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        Job job = new Job(conf, "word count");
+        job.setMapperClass(WordCountMapper.class);
+        job.setCombinerClass(WordCountReducer.class);
+        job.setReducerClass(WordCountReducer.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+        FileInputFormat.addInputPath(job, new Path(args[0]));
+        FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+        job.getConfiguration().writeXml(System.out);
+    }
+}
\ No newline at end of file