Bug fixes:
- external aggregation integration test: fixed the temporal path generator;
- utf-8 compiling failure of storage-am-invertedindex-test on mac.
Features improvement:
- tpch-join client: added algorithm selector "-algo"; added parameters for hybrid/grace-hash operators; added a parameter "-has-groupby" to control whether group-by is performed after the join.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@832 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 2f66bfd..a134a39 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.tests.spillable;
import java.io.File;
+import java.io.IOException;
import org.junit.Test;
@@ -59,61 +60,65 @@
import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
/**
- * @author jarodwen
+ *
*/
public class ExternalAggregateTest extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
-
- static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
-
- static final String outSplits1 = "nc1:" + outSplitsPrefix + "/aggregation_";
- static final String outSplits2 = "nc2:" + outSplitsPrefix + "/aggregation_";
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) });
static final boolean isOutputFile = true;
- final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ final RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
-
- private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
- String prefix) {
+ private AbstractSingleActivityOperatorDescriptor getPrinter(
+ JobSpecification spec, boolean isFile, String prefix)
+ throws IOException {
AbstractSingleActivityOperatorDescriptor printer;
if (!isOutputFile)
printer = new PrinterOperatorDescriptor(spec);
else
- printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
- parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+ printer = new PlainFileWriterOperatorDescriptor(spec,
+ new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC1_ID, createTempFile()
+ .getAbsolutePath()),
+ new FileSplit(NC2_ID, createTempFile()
+ .getAbsolutePath()) }), "\t");
return printer;
}
@@ -122,37 +127,51 @@
public void hashSingleKeyScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(), new CountAggregatorDescriptorFactory(),
- new IntSumAggregatorDescriptorFactory(keyFields.length), outputRec,
- new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
- true);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new CountAggregatorDescriptorFactory(),
+ new IntSumAggregatorDescriptorFactory(keyFields.length),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashSingleKeyScalarGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashSingleKeyScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -165,37 +184,56 @@
public void hashMultipleKeyScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, });
int[] keyFields = new int[] { 0, 9 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
- new IntSumAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(keyFields.length),
- outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new IntSumAggregatorDescriptorFactory(1),
+ new IntSumAggregatorDescriptorFactory(keyFields.length),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashMultipleKeyScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashMultipleKeyScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -208,40 +246,63 @@
public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, });
int[] keyFields = new int[] { 0, 9 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3) }),
- new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(2, 2), new IntSumAggregatorDescriptorFactory(3, 3) }),
- outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(1, 2),
+ new IntSumAggregatorDescriptorFactory(2, 3) }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(2, 2),
+ new IntSumAggregatorDescriptorFactory(3, 3) }),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashMultipleKeyMultipleScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashMultipleKeyMultipleScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -254,36 +315,50 @@
public void hashMultipleKeyNonScalarGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(), new ConcatAggregatorDescriptorFactory(9),
- new ConcatAggregatorDescriptorFactory(keyFields.length), outputRec,
- new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
- true);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new ConcatAggregatorDescriptorFactory(9),
+ new ConcatAggregatorDescriptorFactory(keyFields.length),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashMultipleKeyNonScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashMultipleKeyNonScalarGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -296,43 +371,66 @@
public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0, 9 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
- new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3),
- new ConcatAggregatorDescriptorFactory(9, 4) }), new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(2, 2),
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(1, 2),
+ new IntSumAggregatorDescriptorFactory(2, 3),
+ new ConcatAggregatorDescriptorFactory(9, 4) }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] {
+ new IntSumAggregatorDescriptorFactory(2, 2),
new IntSumAggregatorDescriptorFactory(3, 3),
- new ConcatAggregatorDescriptorFactory(4, 4) }), outputRec,
- new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+ new ConcatAggregatorDescriptorFactory(4, 4) }),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashMultipleKeyMultipleFieldsGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashMultipleKeyMultipleFieldsGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -345,37 +443,51 @@
public void hashSingleKeyScalarAvgGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 3;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(), new AvgAggregatorDescriptorFactory(1),
- new AvgAggregatorDescriptorFactory(keyFields.length), outputRec, new HashSpillableGroupingTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new AvgAggregatorDescriptorFactory(1),
+ new AvgAggregatorDescriptorFactory(keyFields.length),
+ outputRec,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
- "hashSingleKeyScalarGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ isOutputFile, "hashSingleKeyScalarGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 09d0e5a..a43aafb 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -22,13 +22,18 @@
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -52,7 +57,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
public class Main {
private static class Options {
@@ -79,6 +87,29 @@
@Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
public boolean profile = true;
+
+ @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+ public int tableSize = 8191;
+
+ @Option(name = "-algo", usage = "Join types", required = true)
+ public String algo;
+
+ // For grace/hybrid hash join only
+ @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
+ public int memSize;
+
+ @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
+ public int graceInputSize = 10;
+
+ @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+ public int graceRecordsPerFrame = 200;
+
+ @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+ public double graceFactor = 1.2;
+
+ // Whether group-by is processed after the join
+ @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
+ public boolean hasGroupBy = false;
}
public static void main(String[] args) throws Exception {
@@ -86,15 +117,24 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
+ options.port);
- JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
- parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
- options.numJoinPartitions);
+ JobSpecification job = createJob(
+ parseFileSplits(options.inFileCustomerSplits),
+ parseFileSplits(options.inFileOrderSplits),
+ parseFileSplits(options.outFileSplits),
+ options.numJoinPartitions, options.algo,
+ options.graceInputSize, options.graceRecordsPerFrame,
+ options.graceFactor, options.memSize, options.tableSize,
+ options.hasGroupBy);
long start = System.currentTimeMillis();
- JobId jobId = hcc.createJob(options.app, job,
- options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc.createJob(
+ options.app,
+ job,
+ options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+ .noneOf(JobFlag.class));
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
@@ -108,109 +148,271 @@
String s = splits[i].trim();
int idx = s.indexOf(':');
if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
+ throw new IllegalArgumentException("File split " + s
+ + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+ new File(s.substring(idx + 1))));
}
return fSplits;
}
- private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
- FileSplit[] resultSplits, int numJoinPartitions) {
+ private static JobSpecification createJob(FileSplit[] customerSplits,
+ FileSplit[] orderSplits, FileSplit[] resultSplits,
+ int numJoinPartitions, String algo, int graceInputSize,
+ int graceRecordsPerFrame, double graceFactor, int memSize,
+ int tableSize, boolean hasGroupBy) throws HyracksDataException {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(
+ customerSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
+ orderSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
+ spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
createPartitionConstraint(spec, ordScanner, orderSplits);
- FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(
+ spec, custSplitsProvider, new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'),
+ custDesc);
createPartitionConstraint(spec, custScanner, customerSplits);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
- new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
- 6000000);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
+ IOperatorDescriptor join;
- RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ if ("nestedloop".equalsIgnoreCase(algo)) {
+ join = new NestedLoopJoinOperatorDescriptor(spec,
+ new JoinComparatorFactory(
+ UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
+ custOrderJoinDesc, memSize);
- HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+ } else if ("gracehash".equalsIgnoreCase(algo)) {
+ join = new GraceHashJoinOperatorDescriptor(
+ spec,
+ memSize,
+ graceInputSize,
+ graceRecordsPerFrame,
+ graceFactor,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ custOrderJoinDesc);
+
+ } else if ("hybridhash".equalsIgnoreCase(algo)) {
+ join = new HybridHashJoinOperatorDescriptor(
+ spec,
+ memSize,
+ graceInputSize,
+ graceRecordsPerFrame,
+ graceFactor,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ custOrderJoinDesc);
+
+ } else {
+ join = new InMemoryHashJoinOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ custOrderJoinDesc, 6000000);
+ }
+
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
+ numJoinPartitions);
+
+ IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(
spec,
- new int[] { 6 },
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
- groupResultDesc, 16);
- createPartitionConstraint(spec, gby, resultSplits);
-
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
- FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
- createPartitionConstraint(spec, writer, resultSplits);
-
- IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new FieldHashPartitionComputerFactory(
+ new int[] { 1 },
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 1);
- IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
+ IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ new int[] { 0 },
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 0);
- IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(joinGroupConn, join, 0, gby, 0);
+ IOperatorDescriptor endingOp = join;
- IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(gbyPrinterConn, gby, 0, writer, 0);
+ if (hasGroupBy) {
+
+ RecordDescriptor groupResultDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+ spec,
+ new int[] { 6 },
+ new FieldHashPartitionComputerFactory(
+ new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc, 16);
+ createPartitionConstraint(spec, gby, resultSplits);
+
+ IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(joinGroupConn, join, 0, gby, 0);
+
+ endingOp = gby;
+ }
+
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
+ resultSplits);
+ FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(
+ spec, outSplitProvider);
+ createPartitionConstraint(spec, writer, resultSplits);
+
+ IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
spec.addRoot(writer);
return spec;
}
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ private static void createPartitionConstraint(JobSpecification spec,
+ IOperatorDescriptor op, FileSplit[] splits) {
String[] parts = new String[splits.length];
for (int i = 0; i < splits.length; ++i) {
parts[i] = splits[i].getNodeName();
}
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ PartitionConstraintHelper
+ .addAbsoluteLocationConstraint(spec, op, parts);
+ }
+
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
+
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
+ int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator() {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0,
+ pos1);
+ }
+ }
+
+ static class JoinComparator implements ITuplePairComparator {
+
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0,
+ int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0,
+ IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
+ + fStartOffset0, fLen0, accessor1.getBuffer().array(),
+ fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index 7d40c81..50bea01 100644
--- a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -19,6 +19,7 @@
<configuration>
<source>1.6</source>
<target>1.6</target>
+ <encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>