Implemented the memory-bounded HashGroupby and HashJoin for BigObject
It contains both hash grouby and hash join changes.
The main change is
1. update the ExternalGroupby to Hash-based groupby
2. update the Join operators to use the Buffermanager.
The buffer manager part is moved from the Sort package to upper
level so that it can be shared by all the operators.
Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Reviewed-on: https://asterix-gerrit.ics.uci.edu/398
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index c330f8e..e3093c4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -21,12 +21,11 @@
import java.io.File;
import java.io.IOException;
-import org.junit.Test;
-
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -34,6 +33,7 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -63,18 +63,18 @@
import org.apache.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
+import org.junit.Test;
/**
*
*/
public class AggregationTest extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
@@ -106,50 +106,6 @@
}
@Test
- public void singleKeySumInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
public void singleKeySumPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -168,14 +124,14 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }), outputRec);
+ new FloatSumFieldAggregatorFactory(5, true) }),
+ outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
@@ -205,26 +161,26 @@
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false),
- new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false),
- new FloatSumFieldAggregatorFactory(3, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
@@ -239,50 +195,6 @@
}
@Test
- public void singleKeyAvgInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
public void singleKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -299,16 +211,16 @@
PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true), new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
@@ -338,26 +250,26 @@
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new CountFieldAggregatorFactory(false), new AvgFieldGroupAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldMergeAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
@@ -372,50 +284,6 @@
}
@Test
- public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer() });
-
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -424,24 +292,24 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer() });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
int[] keyFields = new int[] { 0 };
PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+ outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
@@ -464,80 +332,36 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer() });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void multiKeySumInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
- outputRec, tableSize);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true, true) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
@@ -573,9 +397,10 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
@@ -605,26 +430,27 @@
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false), new IntSumFieldAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
@@ -639,52 +465,6 @@
}
@Test
- public void multiKeyAvgInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
public void multiKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -693,26 +473,28 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true), new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
@@ -735,36 +517,37 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false), new AvgFieldGroupAggregatorFactory(1, false) }),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false),
- new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new IntSumFieldAggregatorFactory(2, false), new IntSumFieldAggregatorFactory(3, false),
+ new AvgFieldMergeAggregatorFactory(4, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
@@ -779,51 +562,6 @@
}
@Test
- public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
-
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
-
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
-
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -841,16 +579,18 @@
PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+ outputRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
@@ -880,27 +620,29 @@
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
+ long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true, true) }),
+ outputRec, outputRec, new HashSpillableTableFactory(new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
similarity index 99%
rename from hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedSortMergeTest.java
rename to hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 555694f..699389b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -52,7 +52,7 @@
import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-public class OptimizedSortMergeTest extends AbstractIntegrationTest {
+public class HeapSortMergeTest extends AbstractIntegrationTest {
@Test
public void optimizedSortMergeTest01() throws Exception {
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 27650e4..ad3c006 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -22,12 +22,11 @@
import java.io.IOException;
import java.util.BitSet;
-import org.junit.Test;
-
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -35,10 +34,12 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
@@ -56,21 +57,25 @@
import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
+import org.junit.Test;
public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit("asterix-001", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
- new FileSplit("asterix-002", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
- new FileSplit("asterix-003", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
- new FileSplit("asterix-004", new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit("asterix-001", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-002", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-003", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-004", new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+ final int fileSize = 800 * 1024 * 4;
final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
@@ -116,14 +121,18 @@
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
@@ -136,8 +145,9 @@
IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new HashtableLocalityMap(nodemap));
spec.connect(conn1, csvScanner, 0, grouper, 0);
@@ -176,21 +186,26 @@
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+ keyFields, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+ new GlobalHashingLocalityMap());
spec.connect(conn1, csvScanner, 0, grouper, 0);
@@ -209,7 +224,7 @@
throws IOException {
ResultSetId rsId = new ResultSetId(1);
- AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,false,
+ AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
spec.addResultSetId(rsId);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index a10513a..a4c87c8 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.tests.integration;
import java.io.File;
+import java.util.Arrays;
import org.junit.Test;
@@ -43,63 +44,73 @@
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class TPCHCustomerOptimizedHybridHashJoinTest extends AbstractIntegrationTest {
- private static final boolean DEBUG = false;
+
+ private static boolean DEBUG = false;
+
+ static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+ static RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer() });
+
+ static RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer() });
+
+ static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()];
+ static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()];
+
+ static {
+ Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE);
+ Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE);
+ }
+
+ private IOperatorDescriptor getPrinter(JobSpecification spec, File file) {
+ IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] {
+ new FileSplit(NC1_ID, file.getAbsolutePath()) });
+
+ return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|")
+ : new NullSinkOperatorDescriptor(spec);
+ }
@Test
public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
JobSpecification spec = new JobSpecification();
-
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer4.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders4.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
@@ -107,13 +118,14 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case1");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -127,6 +139,7 @@
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
@Test
@@ -136,48 +149,18 @@
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders4.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
@@ -185,13 +168,14 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case2");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -205,6 +189,7 @@
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
@Test
@@ -215,48 +200,18 @@
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
"data/tpch0.001/customer3.tbl"))) };
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
- RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
"data/tpch0.001/orders1.tbl"))) };
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
-
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer() });
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
@@ -264,13 +219,14 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
- IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
- : new NullSinkOperatorDescriptor(spec);
+ File file = File.createTempFile(getClass().getName(), "case3");
+ IOperatorDescriptor printer = getPrinter(spec, file);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -284,6 +240,7 @@
spec.addRoot(printer);
runTest(spec);
+ System.out.println("output to " + file.getAbsolutePath());
}
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2650799..8232a62 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -292,7 +292,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
+ custOrderJoinDesc, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -815,7 +815,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, null);
+ custOrderJoinDesc, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
index b774e0e..039497c 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/VSizeFrameSortMergeTest.java
@@ -87,7 +87,6 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID );
spec.setFrameSize(frameSize);
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, new int[] { 1, 0 },
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
new file mode 100644
index 0000000..b8ec790
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.test.support.TestUtils;
+
+public abstract class AbstractExternalGroupbyTest {
+
+ ISerializerDeserializer[] inFields = new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer(),
+ };
+
+ ISerializerDeserializer[] aggrFields = new ISerializerDeserializer[] {
+ new UTF8StringSerializerDeserializer(), // key
+ IntegerSerializerDeserializer.INSTANCE, // sum
+ IntegerSerializerDeserializer.INSTANCE, // count
+ FloatSerializerDeserializer.INSTANCE, // avg
+ };
+
+ RecordDescriptor inRecordDesc = new RecordDescriptor(inFields);
+
+ RecordDescriptor outputRec = new RecordDescriptor(aggrFields);
+
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+
+ INormalizedKeyComputerFactory normalizedKeyComputerFactory = new UTF8StringNormalizedKeyComputerFactory();
+
+ IAggregatorDescriptorFactory partialAggrInPlace = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(0, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(0, false) });
+
+ IAggregatorDescriptorFactory finalAggrInPlace = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldMergeAggregatorFactory(3, false) });
+
+ IAggregatorDescriptorFactory partialAggrInState = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(0, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(0, true) });
+
+ IAggregatorDescriptorFactory finalAggrInState = new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(2, true),
+ new AvgFieldMergeAggregatorFactory(3, true) });
+
+ int[] keyFields = new int[] { 1 };
+ int[] keyFieldsAfterPartial = new int[] { 0 };
+
+ class ResultValidateWriter implements IFrameWriter {
+
+ final Map<Integer, String> keyValueMap;
+ FrameTupleAccessor resultAccessor = new FrameTupleAccessor(outputRec);
+
+ class Result {
+ Result(int i) {
+ sum = i;
+ count = 1;
+ }
+
+ int sum;
+ int count;
+ }
+
+ private Map<String, Result> answer;
+
+ public ResultValidateWriter(Map<Integer, String> keyValueMap) {
+ this.keyValueMap = keyValueMap;
+ answer = new HashMap<>();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (Map.Entry<Integer, String> keyValue : keyValueMap.entrySet()) {
+ Result result = answer.get(keyValue.getValue());
+ if (result == null) {
+ answer.put(keyValue.getValue(), new Result(keyValue.getKey()));
+ } else {
+ result.sum += keyValue.getKey();
+ result.count++;
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ resultAccessor.reset(buffer);
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ Object[] outRecord = new Object[outputRec.getFieldCount()];
+
+ for (int tid = 0; tid < resultAccessor.getTupleCount(); tid++) {
+ for (int fid = 0; fid < outputRec.getFieldCount(); fid++) {
+ bbis.setByteBuffer(resultAccessor.getBuffer(),
+ resultAccessor.getAbsoluteFieldStartOffset(tid, fid));
+ outRecord[fid] = outputRec.getFields()[fid].deserialize(di);
+ }
+ Result result = answer.remove((String) outRecord[0]);
+ assertNotNull(result);
+ assertEquals(result.sum, (int) outRecord[1]);
+ assertEquals(result.count, (int) outRecord[2]);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ Assert.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ assertEquals(0, answer.size());
+ }
+ }
+
+ @Test
+ public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 3;
+ int frameSize = 256;
+ int minDataSize = frameSize;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+ }
+
+ @Test
+ public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 3;
+ int frameSize = 256;
+ int minDataSize = frameSize * 4;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
+ }
+
+ @Test
+ public void testBuildAndMergeBigObj() throws HyracksDataException {
+ int tableSize = 1001;
+ int numFrames = 4;
+ int frameSize = 256;
+ int minDataSize = frameSize * 5;
+ int minRecordSize = 20;
+ int maxRecordSize = 50;
+ HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2);
+ testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize,
+ bigRecords);
+
+ }
+
+ protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException;
+
+ protected abstract IFrameWriter getBuilder();
+
+ protected abstract IOperatorNodePushable getMerger();
+
+ private void testBuildAndMerge(int tableSize, int numFrames, int frameSize, int minDataSize,
+ int minRecordSize, int maxRecordSize,
+ Map<Integer, String> specialData)
+ throws HyracksDataException {
+
+ IHyracksTaskContext ctx = TestUtils.create(frameSize);
+ initial(ctx, tableSize, numFrames);
+ ArrayList<IFrame> input = new ArrayList<>();
+ Map<Integer, String> keyValueMap = new HashMap<>();
+ AbstractRunGeneratorTest
+ .prepareData(ctx, input, minDataSize, minRecordSize, maxRecordSize, specialData, keyValueMap);
+
+ ResultValidateWriter writer = new ResultValidateWriter(keyValueMap);
+
+ getBuilder().open();
+ for (IFrame frame : input) {
+ getBuilder().nextFrame(frame.getBuffer());
+ }
+ getBuilder().close();
+
+ getMerger().setOutputFrameWriter(0, writer, outputRec);
+ getMerger().initialize();
+ }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index 3cc2a23..673c6fa 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -45,8 +45,8 @@
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.apache.hyracks.test.support.TestUtils;
@@ -54,8 +54,8 @@
public abstract class AbstractRunGeneratorTest {
static TestUtils testUtils = new TestUtils();
- static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+ static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer() };
static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
static Random GRandom = new Random(System.currentTimeMillis());
static int[] SortFields = new int[] { 0, 1 };
@@ -63,17 +63,17 @@
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
- static void assertMaxFrameSizesAreAllEqualsTo(List<RunAndMaxFrameSizePair> maxSize, int pageSize) {
+ static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
for (int i = 0; i < maxSize.size(); i++) {
- assertTrue(maxSize.get(i).maxFrameSize == pageSize);
+ assertTrue(maxSize.get(i).getMaxFrameSize() == pageSize);
}
}
abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
throws HyracksDataException;
- protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns,
- int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+ protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
+ int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
IHyracksTaskContext ctx = testUtils.create(pageSize);
HashMap<Integer, String> keyValuePair = new HashMap<>();
@@ -90,12 +90,12 @@
return runGenerator.getRuns();
}
- static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
+ static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
Map<Integer, String> keyValuePair) throws HyracksDataException {
HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
int maxFrameSizes = 0;
- for (RunAndMaxFrameSizePair run : runs) {
- maxFrameSizes = Math.max(maxFrameSizes, run.maxFrameSize);
+ for (GeneratedRunFileReader run : runs) {
+ maxFrameSizes = Math.max(maxFrameSizes, run.getMaxFrameSize());
}
GroupVSizeFrame gframe = new GroupVSizeFrame(ctx, maxFrameSizes);
GroupFrameAccessor gfta = new GroupFrameAccessor(ctx.getInitialFrameSize(), RecordDesc);
@@ -125,25 +125,25 @@
return preKey;
}
- static void assertReadSorted(List<RunAndMaxFrameSizePair> runs, IFrameTupleAccessor fta, IFrame frame,
+ static void assertReadSorted(List<GeneratedRunFileReader> runs, IFrameTupleAccessor fta, IFrame frame,
Map<Integer, String> keyValuePair) throws HyracksDataException {
assertTrue(runs.size() > 0);
- for (RunAndMaxFrameSizePair run : runs) {
- run.run.open();
+ for (GeneratedRunFileReader run : runs) {
+ run.open();
int preKey = Integer.MIN_VALUE;
- while (run.run.nextFrame(frame)) {
+ while (run.nextFrame(frame)) {
fta.reset(frame.getBuffer());
preKey = assertFTADataIsSorted(fta, keyValuePair, preKey);
}
- run.run.close();
+ run.close();
}
assertTrue(keyValuePair.isEmpty());
}
static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
- throws HyracksDataException {
+ throws HyracksDataException {
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
FrameTupleAppender appender = new FrameTupleAppender();
@@ -223,7 +223,7 @@
int numRuns = 2;
int minRecordSize = pageSize / 8;
int maxRecordSize = pageSize / 8;
- List<RunAndMaxFrameSizePair> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
maxRecordSize, null);
assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
}
@@ -235,8 +235,8 @@
int numRuns = 2;
int minRecordSize = pageSize;
int maxRecordSize = (int) (pageSize * 1.8);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, null);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ null);
assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
}
@@ -248,12 +248,12 @@
int minRecordSize = 20;
int maxRecordSize = pageSize / 2;
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, specialPair);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
int max = 0;
- for (RunAndMaxFrameSizePair run : size) {
- max = Math.max(max, run.maxFrameSize);
+ for (GeneratedRunFileReader run : size) {
+ max = Math.max(max, run.getMaxFrameSize());
}
assertTrue(max == pageSize * (frameLimit - 1));
}
@@ -266,8 +266,8 @@
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
int minRecordSize = 10;
int maxRecordSize = pageSize / 2;
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
- maxRecordSize, specialPair);
+ List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
+ specialPair);
}
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
new file mode 100644
index 0000000..f1a4231
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.ISpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupBuildOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupWriteOperatorNodePushable;
+
+public class ExternalHashGroupbyTest extends AbstractExternalGroupbyTest {
+ ExternalGroupBuildOperatorNodePushable buildOperator;
+ ExternalGroupWriteOperatorNodePushable mergeOperator;
+
+ @Override
+ protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) {
+ ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE });
+ buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx, this.hashCode(), tableSize,
+ numFrames * ctx.getInitialFrameSize(), keyFields, numFrames, comparatorFactories,
+ normalizedKeyComputerFactory, partialAggrInPlace, inRecordDesc, outputRec, tableFactory);
+ mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx, this.hashCode(), tableFactory, outputRec,
+ outputRec, numFrames, keyFieldsAfterPartial, normalizedKeyComputerFactory, comparatorFactories,
+ finalAggrInPlace);
+ }
+
+ @Override
+ protected IFrameWriter getBuilder() {
+ return buildOperator;
+ }
+
+ @Override
+ protected AbstractUnaryOutputSourceOperatorNodePushable getMerger() {
+ return mergeOperator;
+ }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index e6d10f2..567b7df 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -55,10 +55,9 @@
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.junit.Test;
@@ -234,8 +233,8 @@
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize,
- readerList, frameList, keyValueMapList);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
Comparators, null, RecordDesc, topK);
@@ -313,8 +312,8 @@
int maxRecordSize = pageSize / 2;
IHyracksTaskContext ctx = testUtils.create(pageSize);
- ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null,
- ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
+ ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+ RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
runGenerator.open();
Map<Integer, String> keyValuePair = new HashMap<>();
@@ -336,20 +335,19 @@
}
runGenerator.close();
List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
- for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
- inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
+ for (GeneratedRunFileReader max : runGenerator.getRuns()) {
+ inFrame.add(new GroupVSizeFrame(ctx, max.getMaxFrameSize()));
}
// Let each run file reader not delete the run file when it is read and closed.
- for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
- RunFileReader runFileReader = (RunFileReader) run.run;
- PA.setValue(runFileReader, "deleteAfterClose", false);
+ for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+ PA.setValue(run, "deleteAfterClose", false);
}
matchResult(ctx, runGenerator.getRuns(), keyValuePair);
List<IFrameReader> runs = new ArrayList<>();
- for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
- runs.add(run.run);
+ for (GeneratedRunFileReader run : runGenerator.getRuns()) {
+ runs.add(run);
}
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
RecordDesc);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
new file mode 100644
index 0000000..bcf661f
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunGenerator;
+import org.apache.hyracks.dataflow.std.group.sort.ExternalSortGroupByRunMerger;
+import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
+import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public class SortGroupbyTest extends AbstractExternalGroupbyTest {
+ ExternalSortGroupByRunGenerator builder;
+
+ IOperatorNodePushable mergerOperator;
+
+ @Override
+ protected void initial(final IHyracksTaskContext ctx, int tableSize, final int numFrames)
+ throws HyracksDataException {
+ builder = new ExternalSortGroupByRunGenerator(ctx, keyFields, inRecordDesc, numFrames, keyFields,
+ normalizedKeyComputerFactory, comparatorFactories, partialAggrInState, outputRec, Algorithm.QUICK_SORT);
+
+ mergerOperator = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ List<GeneratedRunFileReader> runs = builder.getRuns();
+ ISorter sorter = builder.getSorter();
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null
+ : normalizedKeyComputerFactory.createNormalizedKeyComputer();
+ AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
+ inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators,
+ partialAggrInState, finalAggrInState, true);
+ merger.process();
+ }
+ };
+ }
+
+ @Override
+ protected IFrameWriter getBuilder() {
+ return builder;
+ }
+
+ @Override
+ protected IOperatorNodePushable getMerger() {
+ return mergerOperator;
+ }
+}
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index 3bf69e2..a88c421 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -67,24 +67,7 @@
<goal>assemble</goal>
</goals>
</execution>
- <execution>
- <id>groupclient</id>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.examples.text.client.ExternalGroupClient</mainClass>
- <name>groupclient</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
+ </executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
deleted file mode 100644
index 1ae32589..0000000
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/ExternalGroupClient.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.examples.text.client;
-
-import java.io.File;
-
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-
-/**
- * The application client for the performance tests of the external hash group
- * operator.
- */
-public class ExternalGroupClient {
- private static class Options {
- @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
- public String host;
-
- @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
- public int port = 1098;
-
- @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
- public String inFileSplits;
-
- @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
- public String outFileSplits;
-
- @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
- public int htSize = 8191;
-
- @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
- public int frameSize = 32768;
-
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 512)", required = false)
- public int sbSize = 512;
-
- @Option(name = "-sort-output", usage = "Whether to sort the output (default: true)", required = false)
- public boolean sortOutput = false;
-
- @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
- public boolean outPlain = true;
-
- @Option(name = "-algo", usage = "The algorithm to be used", required = true)
- public int algo;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- Options options = new Options();
- CmdLineParser parser = new CmdLineParser(options);
- parser.parseArgument(args);
-
- IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
-
- JobSpecification job;
-
- for (int i = 0; i < 6; i++) {
- long start = System.currentTimeMillis();
- job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
- options.htSize, options.sbSize, options.frameSize, options.sortOutput, options.algo,
- options.outPlain);
-
- System.out.print(i + "\t" + (System.currentTimeMillis() - start));
- start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
- System.out.println("\t" + (System.currentTimeMillis() - start));
- }
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
- }
- return fSplits;
- }
-
- private static FileSplit[] parseFileSplits(String fileSplits, int count) {
- String[] splits = fileSplits.split(",");
- FileSplit[] fSplits = new FileSplit[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- String s = splits[i].trim();
- int idx = s.indexOf(':');
- if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
- }
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
- + count)));
- }
- return fSplits;
- }
-
- private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
- int frameSize, boolean sortOutput, int alg, boolean outPlain) {
- JobSpecification spec = new JobSpecification(frameSize);
- IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-
- RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
- new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
-
- FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
-
- createPartitionConstraint(spec, fileScanner, inSplits);
-
- // Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- // IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
-
- // Specify the grouping key, which will be the string extracted during
- // the scan.
- int[] keys = new int[] { 0,
- // 1
- };
-
- AbstractOperatorDescriptor grouper;
-
- switch (alg) {
- case 0: // new external hash graph
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
-
- break;
- case 1: // External-sort + new-precluster
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, frameSize, keys,
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
- createPartitionConstraint(spec, sorter2, inSplits);
-
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
-
- grouper = new org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor(
- spec, keys, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
- break;
- case 2: // Inmem
- grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }),
- new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- outDesc, htSize);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanConn2, fileScanner, 0, grouper, 0);
- break;
- default:
- grouper = new org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
- keys, frameSize, new IBinaryComparatorFactory[] {
- // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
- new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
- false) }), outDesc, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
-
- createPartitionConstraint(spec, grouper, outSplits);
-
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
- }
-
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-
- AbstractSingleActivityOperatorDescriptor writer;
-
- if (outPlain)
- writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
- else
- writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-
- createPartitionConstraint(spec, writer, outSplits);
-
- IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(groupOutConn, grouper, 0, writer, 0);
-
- spec.addRoot(writer);
- return spec;
- }
-
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
- String[] parts = new String[splits.length];
- for (int i = 0; i < splits.length; ++i) {
- parts[i] = splits[i].getNodeName();
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
- }
-}
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index b9cbb9d..22a571f 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -21,9 +21,6 @@
import java.io.File;
import java.util.EnumSet;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
@@ -31,6 +28,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileReference;
@@ -39,6 +37,7 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -52,14 +51,19 @@
import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import org.apache.hyracks.examples.text.WordTupleParserFactory;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
public class WordCountMain {
private static class Options {
@@ -84,8 +88,8 @@
@Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
public int htSize = 8191;
- @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
- public int sbSize = 32768;
+ @Option(name = "-frame-limit", usage = "Memory limit in frames (default:4)", required = false)
+ public int memFrameLimit = 10;
@Option(name = "-runtime-profiling", usage = "Indicates if runtime profiling should be enabled. (default: false)")
public boolean runtimeProfiling = false;
@@ -94,6 +98,8 @@
public int frameSize = 32768;
}
+ private static long fileSize = 0;
+
public static void main(String[] args) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -102,7 +108,7 @@
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
- options.algo, options.htSize, options.sbSize, options.format, options.frameSize);
+ options.algo, options.htSize, options.memFrameLimit, options.format, options.frameSize);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
@@ -121,13 +127,15 @@
if (idx < 0) {
throw new IllegalArgumentException("File split " + s + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ File file = new File(s.substring(idx + 1));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(file));
+ fileSize += file.length();
}
return fSplits;
}
private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, String algo, int htSize,
- int sbSize, String format, int frameSize) {
+ int frameLimit, String format, int frameSize) {
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
@@ -144,40 +152,39 @@
IOperatorDescriptor gBy;
int[] keys = new int[] { 0 };
if ("hash".equalsIgnoreCase(algo)) {
- gBy = new HashGroupOperatorDescriptor(
- spec,
- keys,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
+ gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- groupResultDesc, htSize);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
createPartitionConstraint(spec, gBy, outSplits);
IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
} else {
- IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) };
- IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec,
- keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc)
- : new ExternalSortOperatorDescriptor(spec, sbSize, keys,
+ IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
+ ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
+ wordDesc)
+ : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
createPartitionConstraint(spec, sorter, outSplits);
IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
- gBy = new PreclusteredGroupOperatorDescriptor(
- spec,
- keys,
+ gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiFieldsAggregatorFactory(
new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
@@ -188,8 +195,9 @@
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
- IOperatorDescriptor writer = "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec,
- outSplitProvider, ",") : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
+ ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+ : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index cd45536..a52d21e 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -27,44 +27,79 @@
<version>0.2.17-SNAPSHOT</version>
</parent>
- <dependencies>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-data-std</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
- <executions>
- <execution>
- <configuration>
- <programs>
- <program>
- <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
- <name>tpchclient</name>
- </program>
- </programs>
- <repositoryLayout>flat</repositoryLayout>
- <repositoryName>lib</repositoryName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>assemble</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
+ <executions>
+ <execution>
+ <id>sort</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
+ <name>sort</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>group</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.hyracks.examples.tpch.client.Groupby</mainClass>
+ <name>group</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>join</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.hyracks.examples.tpch.client.Join</mainClass>
+ <name>join</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
index ac172fd..15d7f66 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
@@ -27,8 +27,12 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -56,12 +60,36 @@
new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
new UTF8StringSerializerDeserializer() });
- static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+ static RecordDescriptor lineitemDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+ new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+ static IValueParserFactory[] lineitemParserFactories = new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, };
+
+ static IValueParserFactory[] custParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+ static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
static FileSplit[] parseFileSplits(String fileSplits) {
String[] splits = fileSplits.split(",");
@@ -77,6 +105,21 @@
return fSplits;
}
+ static FileSplit[] parseFileSplits(String fileSplits, int count) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+ + count)));
+ }
+ return fSplits;
+ }
+
static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
String[] parts = new String[splits.length];
for (int i = 0; i < splits.length; ++i) {
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
new file mode 100644
index 0000000..2ef5835
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.examples.tpch.client;
+
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemDesc;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+/**
+ * The application client for the performance tests of the groupby
+ * operator.
+ */
+public class Groupby {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+ public int port = 1098;
+
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+ public String inFileSplits;
+
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+
+ @Option(name = "-input-tuples", usage = "Hash table size ", required = true)
+ public int htSize;
+
+ @Option(name = "-input-size", usage = "Physical file size in bytes ", required = true)
+ public long fileSize;
+
+ @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
+ public int frameSize = 32768;
+
+ @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
+ public int frameLimit = 4;
+
+ @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+ public boolean outPlain = true;
+
+ @Option(name = "-algo", usage = "The algorithm to be used: hash|sort", required = true)
+ public String algo = "hash";
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ if (args.length == 0) {
+ parser.printUsage(System.err);
+ return;
+ }
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+ JobSpecification job;
+
+ long start = System.currentTimeMillis();
+ job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits), options.htSize,
+ options.fileSize, options.frameLimit, options.frameSize, options.algo, options.outPlain);
+ if (job != null) {
+ System.out.print("CreateJobTime:" + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob(job);
+ hcc.waitForCompletion(jobId);
+ System.out.println("JobExecuteTime:" + (System.currentTimeMillis() - start));
+ }
+ }
+
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, long fileSize,
+ int frameLimit, int frameSize, String alg, boolean outPlain) {
+ JobSpecification spec = new JobSpecification(frameSize);
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+
+ FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+ new DelimitedDataTupleParserFactory(lineitemParserFactories, '|'), lineitemDesc);
+
+ createPartitionConstraint(spec, fileScanner, inSplits);
+
+ // Output: each unique string with an integer count
+ RecordDescriptor outDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ // IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ // Specify the grouping key, which will be the string extracted during
+ // the scan.
+ int[] keys = new int[] { 0,
+ // 1
+ };
+
+ AbstractOperatorDescriptor grouper;
+
+ if (alg.equalsIgnoreCase("hash")) {// external hash graph
+ grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+ new IBinaryComparatorFactory[] {
+ // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(keys.length, false) }),
+ outDesc, outDesc, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }));
+
+ createPartitionConstraint(spec, grouper, outSplits);
+ } else if (alg.equalsIgnoreCase("sort")) {
+ grouper = new SortGroupByOperatorDescriptor(spec, frameLimit, keys, keys,
+ new IntegerNormalizedKeyComputerFactory(),
+ new IBinaryComparatorFactory[] {
+ // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(keys.length, true) }),
+ outDesc, outDesc, false);
+
+ createPartitionConstraint(spec, grouper, outSplits);
+ } else {
+ System.err.println("unknow groupby alg:" + alg);
+ return null;
+ }
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+ spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
+
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+
+ AbstractSingleActivityOperatorDescriptor writer;
+
+ if (outPlain)
+ writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+ else
+ writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+
+ createPartitionConstraint(spec, writer, outSplits);
+
+ IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(groupOutConn, grouper, 0, writer, 0);
+
+ spec.addRoot(writer);
+ return spec;
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index fbfc5f5..507e1c7 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -18,26 +18,22 @@
*/
package org.apache.hyracks.examples.tpch.client;
-import static org.apache.hyracks.examples.tpch.client.Common.*;
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.custParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
import java.util.EnumSet;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -45,11 +41,11 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -57,16 +53,21 @@
import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
public class Join {
private static class Options {
@@ -91,23 +92,23 @@
@Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
public boolean profile = true;
- @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+ @Option(name = "-table-size", usage = "Table size for in-memory hash join. (default: 8191)", required = false)
public int tableSize = 8191;
- @Option(name = "-algo", usage = "Join types", required = true)
+ @Option(name = "-algo", usage = "Join types:InMem|NestedLoop|Hybrid|Grace", required = true)
public String algo;
// For grace/hybrid hash join only
@Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
public int memSize;
- @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
- public int graceInputSize = 10;
+ @Option(name = "-input-size", usage = "Input size of the hybrid hash join", required = false)
+ public int graceInputSize = 100000;
- @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+ @Option(name = "-records-per-frame", usage = "Records per frame for hybrid hash join", required = false)
public int graceRecordsPerFrame = 200;
- @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+ @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join, (default:1.2)", required = false)
public double graceFactor = 1.2;
// Whether group-by is processed after the join
@@ -121,6 +122,10 @@
public static void main(String[] args) throws Exception {
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
+ if (args.length == 0) {
+ parser.printUsage(System.err);
+ return;
+ }
parser.parseArgument(args);
IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
@@ -129,6 +134,9 @@
parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
+ if (job == null) {
+ return;
+ }
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
@@ -141,87 +149,76 @@
private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
- throws HyracksDataException {
+ throws HyracksDataException {
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+ long custFileSize = 0;
+ for (int i = 0; i < customerSplits.length; i++) {
+ custFileSize += customerSplits[i].getLocalFile().getFile().length();
+ }
IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+ long orderFileSize = 0;
+ for (int i = 0; i < orderSplits.length; i++) {
+ orderFileSize += orderSplits[i].getLocalFile().getFile().length();
+ }
FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), Common.ordersDesc);
+ new DelimitedDataTupleParserFactory(orderParserFactories, '|'), Common.ordersDesc);
createPartitionConstraint(spec, ordScanner, orderSplits);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), Common.custDesc);
+ new DelimitedDataTupleParserFactory(custParserFactories, '|'), Common.custDesc);
createPartitionConstraint(spec, custScanner, customerSplits);
IOperatorDescriptor join;
if ("nestedloop".equalsIgnoreCase(algo)) {
- join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), Common.custOrderJoinDesc,
- memSize, false, null);
+ join = new NestedLoopJoinOperatorDescriptor(spec,
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ Common.custOrderJoinDesc, memSize, false, null);
- } else if ("gracehash".equalsIgnoreCase(algo)) {
- join = new GraceHashJoinOperatorDescriptor(
- spec,
- memSize,
- graceInputSize,
- graceRecordsPerFrame,
- graceFactor,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
+ } else if ("inmem".equalsIgnoreCase(algo)) {
+ join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- Common.custOrderJoinDesc, null);
+ Common.custOrderJoinDesc, tableSize, null);
- } else if ("hybridhash".equalsIgnoreCase(algo)) {
- join = new HybridHashJoinOperatorDescriptor(
- spec,
- memSize,
- graceInputSize,
- graceRecordsPerFrame,
- graceFactor,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
+ } else if ("hybrid".equalsIgnoreCase(algo)) {
+ join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ Common.custOrderJoinDesc,
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+ null);
+
+ } else if ("grace".equalsIgnoreCase(algo)) {
+ join = new GraceHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceRecordsPerFrame, graceFactor,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
Common.custOrderJoinDesc, null);
} else {
- join = new InMemoryHashJoinOperatorDescriptor(
- spec,
- new int[] { 0 },
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) },
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- Common.custOrderJoinDesc, 6000000, null);
+ System.err.println("unknown algorithm:" + algo);
+ return null;
}
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(ordJoinConn, ordScanner, 0, join, 1);
IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(custJoinConn, custScanner, 0, join, 0);
IOperatorDescriptor endingOp = join;
@@ -231,29 +228,33 @@
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
- HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
- spec,
- new int[] { 6 },
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }),
+ ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
+ custFileSize + orderFileSize, new int[] { 6 }, memSize,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
- groupResultDesc, 16);
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }),
+ groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
createPartitionConstraint(spec, gby, resultSplits);
IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+ new FieldHashPartitionComputerFactory(new int[] { 6 }, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(joinGroupConn, join, 0, gby, 0);
endingOp = gby;
}
IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
- FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ //FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ IOperatorDescriptor writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+
createPartitionConstraint(spec, writer, resultSplits);
IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
@@ -262,63 +263,4 @@
spec.addRoot(writer);
return spec;
}
-
-
-
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
-
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
-
- @Override
- public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
- }
- }
-
- static class JoinComparator implements ITuplePairComparator {
-
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
-
- public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
-
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
- throws HyracksDataException {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
-
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
-
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
}
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index f2dd519..28e6dd9 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -54,7 +55,6 @@
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
public class Sort {
private static class Options {