Reverted r480 -- mistaken deletion of text-example
git-svn-id: 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/text-example/textclient/.classpath b/hyracks-examples/text-example/textclient/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <classpathentry kind="src" output="target/classes" path="src/main/java"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-examples/text-example/textclient/.project b/hyracks-examples/text-example/textclient/.project
new file mode 100644
index 0000000..04307d3
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <name>textclient</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
diff --git a/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs b/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..8599738
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Tue Sep 28 14:37:42 PDT 2010
diff --git a/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs b/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..4562b1a
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Tue Sep 28 14:37:42 PDT 2010
+resourceFilterGoals=process-resources resources\:testResources
diff --git a/hyracks-examples/text-example/textclient/pom.xml b/hyracks-examples/text-example/textclient/pom.xml
new file mode 100644
index 0000000..b85914a
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>textclient</artifactId>
+ <version>0.1.6-SNAPSHOT</version>
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>text-example</artifactId>
+ <version>0.1.6-SNAPSHOT</version>
+ </parent>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.6-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>texthelper</artifactId>
+ <version>0.1.6-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.text.client.WordCountMain</mainClass>
+ <name>textclient</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
diff --git a/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/
new file mode 100644
index 0000000..8d22bca
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/
@@ -0,0 +1,305 @@
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.text.client;
+import java.util.UUID;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+ * The application client for the performance tests of the external hash group
+ * operator.
+ */
+public class ExternalGroupClient {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+ public int port = 1099;
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+ public String inFileSplits;
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+ @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+ public int htSize = 8191;
+ @Option(name = "-frames-limit", usage = "Frame size (default: 32768)", required = false)
+ public int framesLimit = 32768;
+ @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
+ public int sbSize = 512;
+ @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
+ public boolean sortOutput = false;
+ @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+ public boolean outPlain = true;
+ }
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(, options.port);
+ JobSpecification job;
+ for (int i = 0; i < 3; i++) {
+ long start = System.currentTimeMillis();
+ job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
+ options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 2, options.outPlain);
+ System.out.print(i + "\t" + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+ UUID jobId = hcc.createJob(, job);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ System.out.println("\t" + (System.currentTimeMillis() - start));
+ }
+ }
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+ private static FileSplit[] parseFileSplits(String fileSplits, int count) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+ + count)));
+ }
+ return fSplits;
+ }
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
+ int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+ RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
+ createPartitionConstraint(spec, fileScanner, inSplits);
+ // Output: each unique string with an integer count
+ RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ // IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+ // Specify the grouping key, which will be the string extracted during
+ // the scan.
+ int[] keys = new int[] { 0,
+ // 1
+ };
+ AbstractOperatorDescriptor grouper;
+ switch (alg) {
+ case 0: // External hash group
+ grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
+ createPartitionConstraint(spec, grouper, outSplits);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
+ break;
+ case 1: // External sort + pre-cluster
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+ createPartitionConstraint(spec, sorter, inSplits);
+ // Connect scan operator with the sorter
+ IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
+ grouper = new PreclusteredGroupOperatorDescriptor(spec, keys, new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc);
+ createPartitionConstraint(spec, grouper, outSplits);
+ // Connect sorter with the pre-cluster
+ OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(sortGroupConn, sorter, 0, grouper, 0);
+ break;
+ case 2: // In-memory hash group
+ grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
+ createPartitionConstraint(spec, grouper, outSplits);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanConn, fileScanner, 0, grouper, 0);
+ break;
+ default:
+ grouper = new ExternalHashGroupOperatorDescriptor(spec, keys, framesLimit, false,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
+ htSize);
+ createPartitionConstraint(spec, grouper, outSplits);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConnDef = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+ }
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+ AbstractSingleActivityOperatorDescriptor writer;
+ if (outPlain)
+ writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+ else
+ writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ createPartitionConstraint(spec, writer, outSplits);
+ IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(groupOutConn, grouper, 0, writer, 0);
+ spec.addRoot(writer);
+ return spec;
+ }
+ private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ String[] parts = new String[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ parts[i] = splits[i].getNodeName();
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ }
\ No newline at end of file
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/
new file mode 100644
index 0000000..a0f24c4
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/
@@ -0,0 +1,190 @@
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.text.client;
+import java.util.UUID;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
+public class WordCountMain {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+ public int port = 1099;
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+ public String inFileSplits;
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+ @Option(name = "-algo", usage = "Use Hash based grouping", required = true)
+ public String algo;
+ @Option(name = "-format", usage = "Specify output format: binary/text (default: text)", required = false)
+ public String format = "text";
+ @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+ public int htSize = 8191;
+ @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+ public int sbSize = 32768;
+ }
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(, options.port);
+ JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
+ options.algo, options.htSize, options.sbSize, options.format);
+ long start = System.currentTimeMillis();
+ UUID jobId = hcc.createJob(, job);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
+ int sbSize, String format) {
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+ RecordDescriptor wordDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+ new WordTupleParserFactory(), wordDesc);
+ createPartitionConstraint(spec, wordScanner, inSplits);
+ RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ IOperatorDescriptor gBy;
+ int[] keys = new int[] { 0 };
+ if ("hash".equalsIgnoreCase(algo)) {
+ gBy = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc, htSize);
+ createPartitionConstraint(spec, gBy, outSplits);
+ IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
+ } else {
+ IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE };
+ IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec,
+ keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc)
+ : new ExternalSortOperatorDescriptor(spec, sbSize, keys,
+ new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
+ createPartitionConstraint(spec, sorter, outSplits);
+ IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
+ gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc);
+ createPartitionConstraint(spec, gBy, outSplits);
+ OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(sortGroupConn, sorter, 0, gBy, 0);
+ }
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+ IOperatorDescriptor writer = "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec,
+ outSplitProvider, ",") : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ createPartitionConstraint(spec, writer, outSplits);
+ IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(gbyPrinterConn, gBy, 0, writer, 0);
+ spec.addRoot(writer);
+ return spec;
+ }
+ private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ String[] parts = new String[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ parts[i] = splits[i].getNodeName();
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ }
\ No newline at end of file