Implemented the memory-bounded HashGroupby and HashJoin for BigObject
It contains both hash grouby and hash join changes.
The main change is
1. update the ExternalGroupby to Hash-based groupby
2. update the Join operators to use the Buffermanager.
The buffer manager part is moved from the Sort package to upper
level so that it can be shared by all the operators.
Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Reviewed-on: https://asterix-gerrit.ics.uci.edu/398
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index 3bf69e2..a88c421 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -67,24 +67,7 @@
<goal>assemble</goal>
</goals>
</execution>
- <execution>
- <id>groupclient</id>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.examples.text.client.ExternalGroupClient</mainClass>
- <name>groupclient</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
+ </executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
deleted file mode 100644
index 1ae3258..0000000
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.examples.text.client;
-
-import java.io.File;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-
-/**
- * The application client for the performance tests of the external hash group
- * operator.
- */
-public class ExternalGroupClient {
- private static class Options {
- @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
- public String host;
-
- @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
- public int port = 1098;
-
- @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
- public String inFileSplits;
-
- @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
- public String outFileSplits;
-
- @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
- public int htSize = 8191;
-
- @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
- public int frameSize = 32768;
-
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
- public int sbSize = 512;
-
- @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
- public boolean sortOutput = false;
-
- @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
- public boolean outPlain = true;
-
- @Option(name = "-algo", usage = "The algorithm to be used", required = true)
- public int algo;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
-
- IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
- JobSpecification job;
-
- for (int i = 0; i < 6; i++) {
- long start = System.currentTimeMillis();
- job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
- options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
- options.outPlain);
-
- System.out.print(i + "\t" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
- System.out.println("\t" + (System.currentTimeMillis() - start));
- }
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits, int count) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
- + count)));
- }
- return fSplits;
- }
-
- private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
- int frameSize, boolean sortOutput, int alg, boolean outPlain) {
- JobSpecification spec = new JobSpecification(frameSize);
- IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-
- RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
-
- createPartitionConstraint(spec, fileScanner, inSplits);
-
- // Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- // IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
-
- // Specify the grouping key, which will be the string extracted during
- // the scan.
- int[] keys = new int[] { 0,
- // 1
- };
-
- AbstractOperatorDescriptor grouper;
-
- switch (alg) {
- case 0: // new external hash graph
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
-
- break;
- case 1: // External-sort + new-precluster
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
- createPartitionConstraint(spec, sorter2, inSplits);
-
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
-
- grouper = new org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor(
- spec, keys, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
- break;
- case 2: // Inmem
- grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc, htSize);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanConn2, fileScanner, 0, grouper, 0);
- break;
- default:
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
- }
-
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-
- AbstractSingleActivityOperatorDescriptor writer;
-
- if (outPlain)
- writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
- else
- writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-
- createPartitionConstraint(spec, writer, outSplits);
-
- IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(groupOutConn, grouper, 0, writer, 0);
-
- spec.addRoot(writer);
- return spec;
- }
-
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
- String[] parts = new String[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- parts[i] = splits[i].getNodeName();
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
- }
-}
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index b9cbb9d..22a571f 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -21,9 +21,6 @@
import java.io.File;
import java.util.EnumSet;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
@@ -31,6 +28,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileReference;
@@ -39,6 +37,7 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -52,14 +51,19 @@
import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import org.apache.hyracks.examples.text.WordTupleParserFactory;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
public class WordCountMain {
private static class Options {
@@ -84,8 +88,8 @@
@Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
public int htSize = 8191;
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
- public int sbSize = 32768;
+ @Option(name = "-frame-limit", usage = "Memory limit in frames (default:4)", required = false)
+ public int memFrameLimit = 10;
@Option(name = "-runtime-profiling", usage = "Indicates if runtime profiling should be enabled. (default: false)")
public boolean runtimeProfiling = false;
@@ -94,6 +98,8 @@
public int frameSize = 32768;
}
+ private static long fileSize = 0;
+
public static void main(String[] args) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -102,7 +108,7 @@
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
- options.algo, options.htSize, options.sbSize, options.format, options.frameSize);
+ options.algo, options.htSize, options.memFrameLimit, options.format, options.frameSize);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
@@ -121,13 +127,15 @@
if (idx < 0) {
throw new IllegalArgumentException("File split " + s + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ File file = new File(s.substring(idx + 1));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(file));
+ fileSize += file.length();
}
return fSplits;
}
private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
- int sbSize, String format, int frameSize) {
+ int frameLimit, String format, int frameSize) {
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
@@ -144,40 +152,39 @@
IOperatorDescriptor gBy;
int[] keys = new int[] { 0 };
if ("hash".equalsIgnoreCase(algo)) {
- gBy = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
+ gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- groupResultDesc, htSize);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
createPartitionConstraint(spec, gBy, outSplits);
IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
} else {
- IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) };
- IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec,
- keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc)
- : new ExternalSortOperatorDescriptor(spec, sbSize, keys,
+ IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
+ ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
+ wordDesc)
+ : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
createPartitionConstraint(spec, sorter, outSplits);
IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
- gBy = new PreclusteredGroupOperatorDescriptor(
- spec,
- keys,
+ gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
@@ -188,8 +195,9 @@
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
- IOperatorDescriptor writer = "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec,
- outSplitProvider, ",") : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
+ ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+ : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);