Added Binary Preclustered Group Operator. Preclustered and Hash based group operators now use the same aggregator interface. Added a Frame Writing Operator.

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@132 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
new file mode 100644
index 0000000..96045d2
--- /dev/null
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.text.client;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
+
+public class WordCountMain {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        public int port = 1099;
+
+        @Option(name = "-app", usage = "Hyracks Application name", required = true)
+        public String app;
+
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+        public String inFileSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-use-hash", usage = "Use Hash based grouping", required = true)
+        public boolean useHash;
+
+        @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+        public int htSize = 8191;
+
+        @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+        public int sbSize = 32768;
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+        JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
+                options.useHash, options.htSize, options.sbSize);
+
+        long start = System.currentTimeMillis();
+        UUID jobId = hcc.createJob(options.app, job);
+        hcc.start(jobId);
+        hcc.waitForCompletion(jobId);
+        long end = System.currentTimeMillis();
+        System.err.println(start + " " + end + " " + (end - start));
+    }
+
+    private static FileSplit[] parseFileSplits(String fileSplits) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new File(s.substring(idx + 1)));
+        }
+        return fSplits;
+    }
+
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, boolean useHash, int htSize,
+            int sbSize) {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+        RecordDescriptor wordDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new WordTupleParserFactory(), wordDesc);
+        wordScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
+
+        RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+        IOperatorDescriptor gBy;
+        if (useHash) {
+            gBy = new HashGroupOperatorDescriptor(spec, new int[] { 0 },
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    groupResultDesc, htSize);
+            gBy.setPartitionConstraint(createPartitionConstraint(outSplits));
+            IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
+        } else {
+            ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, sbSize, new int[] { 0 },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, wordDesc);
+
+            IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 0 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
+
+            gBy = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    groupResultDesc);
+            OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+            spec.connect(sortGroupConn, sorter, 0, gBy, 0);
+        }
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        writer.setPartitionConstraint(createPartitionConstraint(outSplits));
+
+        IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(gbyPrinterConn, gBy, 0, writer, 0);
+
+        spec.addRoot(writer);
+        return spec;
+    }
+
+    private static PartitionConstraint createPartitionConstraint(FileSplit[] splits) {
+        LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            lConstraints[i] = new AbsoluteLocationConstraint(splits[i].getNodeName());
+        }
+        return new ExplicitPartitionConstraint(lConstraints);
+    }
+}
\ No newline at end of file