Fixed word count for presorted aggregation
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@133 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 96045d2..b7a5639 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -70,8 +70,8 @@
@Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
public String outFileSplits;
- @Option(name = "-use-hash", usage = "Use Hash based grouping", required = true)
- public boolean useHash;
+ @Option(name = "-algo", usage = "Use Hash based grouping", required = true)
+ public String algo;
@Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
public int htSize = 8191;
@@ -88,7 +88,7 @@
IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
- options.useHash, options.htSize, options.sbSize);
+ options.algo, options.htSize, options.sbSize);
long start = System.currentTimeMillis();
UUID jobId = hcc.createJob(options.app, job);
@@ -112,7 +112,7 @@
return fSplits;
}
- private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, boolean useHash, int htSize,
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
int sbSize) {
JobSpecification spec = new JobSpecification();
@@ -128,7 +128,7 @@
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
IOperatorDescriptor gBy;
- if (useHash) {
+ if ("hash".equalsIgnoreCase(algo)) {
gBy = new HashGroupOperatorDescriptor(spec, new int[] { 0 },
new FieldHashPartitionComputerFactory(new int[] { 0 },
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
@@ -144,6 +144,7 @@
} else {
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, sbSize, new int[] { 0 },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, wordDesc);
+ sorter.setPartitionConstraint(createPartitionConstraint(outSplits));
IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
@@ -155,6 +156,7 @@
new MultiAggregatorFactory(
new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
groupResultDesc);
+ gBy.setPartitionConstraint(createPartitionConstraint(outSplits));
OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
spec.connect(sortGroupConn, sorter, 0, gBy, 0);
}