Fixed word count for presorted aggregation

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@133 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 96045d2..b7a5639 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -70,8 +70,8 @@
         @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
         public String outFileSplits;
 
-        @Option(name = "-use-hash", usage = "Use Hash based grouping", required = true)
-        public boolean useHash;
+        @Option(name = "-algo", usage = "Use Hash based grouping", required = true)
+        public String algo;
 
         @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
         public int htSize = 8191;
@@ -88,7 +88,7 @@
         IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
-                options.useHash, options.htSize, options.sbSize);
+                options.algo, options.htSize, options.sbSize);
 
         long start = System.currentTimeMillis();
         UUID jobId = hcc.createJob(options.app, job);
@@ -112,7 +112,7 @@
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, boolean useHash, int htSize,
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
             int sbSize) {
         JobSpecification spec = new JobSpecification();
 
@@ -128,7 +128,7 @@
                 UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         IOperatorDescriptor gBy;
-        if (useHash) {
+        if ("hash".equalsIgnoreCase(algo)) {
             gBy = new HashGroupOperatorDescriptor(spec, new int[] { 0 },
                     new FieldHashPartitionComputerFactory(new int[] { 0 },
                             new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
@@ -144,6 +144,7 @@
         } else {
             ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, sbSize, new int[] { 0 },
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, wordDesc);
+            sorter.setPartitionConstraint(createPartitionConstraint(outSplits));
 
             IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
                     new FieldHashPartitionComputerFactory(new int[] { 0 },
@@ -155,6 +156,7 @@
                     new MultiAggregatorFactory(
                             new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
                     groupResultDesc);
+            gBy.setPartitionConstraint(createPartitionConstraint(outSplits));
             OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
             spec.connect(sortGroupConn, sorter, 0, gBy, 0);
         }