Fixed bugs on key fields arrangement when partial grouping values are stored; added test cases to benchmark the new interface.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@962 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index f273c37..4cda69e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -33,6 +33,10 @@
public void reset() {
state = null;
}
+
+ public void close() {
+ state = null;
+ }
public Object getState() {
return state;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index 31d406d..2327965 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -274,9 +274,14 @@
.createBinaryComparator();
}
+ int[] keyFieldsInPartialResults = new int[keyFields.length];
+ for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ keyFieldsInPartialResults[i] = i;
+ }
+
final IAggregatorDescriptor aggregator = mergerFactory
.createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0], keyFields);
+ recordDescriptors[0], keyFields, keyFieldsInPartialResults);
final AggregateState aggregateState = aggregator
.createAggregateStates();
@@ -350,6 +355,7 @@
writer.fail();
throw new HyracksDataException(e);
} finally {
+ aggregateState.close();
writer.close();
}
}
@@ -421,6 +427,7 @@
runFileReaders, tupleAccessors,
topTuples);
} else {
+ closeRun(runIndex, runFileReaders, tupleAccessors);
break;
}
}
@@ -622,6 +629,7 @@
if (runCursors[index] != null) {
runCursors[index].close();
runCursors[index] = null;
+ tupleAccessor[index] = null;
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index f38eeb3..d5edd25 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -108,8 +108,13 @@
ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
tpc = tpcf.createPartitioner();
+ int[] keyFieldsInPartialResults = new int[fields.length];
+ for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ keyFieldsInPartialResults[i] = i;
+ }
+
this.aggregator = aggregatorFactory.createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor, fields);
+ inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
accumulatorSize = 0;
@@ -217,4 +222,10 @@
flushFrame(appender, writer);
}
}
+
+ void close() throws HyracksDataException {
+ for(AggregateState aState : aggregateStates){
+ aState.close();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index d13cc5d..b20197d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -124,9 +124,14 @@
final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
: firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ int[] keyFieldsInPartialResults = new int[keyFields.length];
+ for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ keyFieldsInPartialResults[i] = i;
+ }
+
final IAggregatorDescriptor aggregator = aggregateFactory
.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
- keyFields);
+ keyFields, keyFieldsInPartialResults);
final AggregateState aggregateState = aggregator
.createAggregateStates();
@@ -356,6 +361,7 @@
tPointers = null;
table.close();
frames.clear();
+ aggregateState.close();
}
/**
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
index 65b3873..a0802c8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
@@ -27,6 +27,6 @@
IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
index 116992d..1c1c7a2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupOperatorDescriptor.java
@@ -61,7 +61,7 @@
.getInputRecordDescriptor(getOperatorId(), 0);
final IAggregatorDescriptor aggregator = aggregatorFactory
.createAggregator(ctx, inRecordDesc, recordDescriptors[0],
- groupFields);
+ groupFields, groupFields);
final ByteBuffer copyFrame = ctx.allocateFrame();
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
ctx.getFrameSize(), inRecordDesc);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
index f8e10ef..db6e4ab 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/PreclusteredGroupWriter.java
@@ -128,6 +128,7 @@
FrameUtils.flushFrame(appender.getBuffer(), writer);
}
}
+ aggregateState.close();
writer.close();
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
index 6f92cf1..0ae62e5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -55,7 +55,7 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields)
+ RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
@@ -101,9 +101,9 @@
AggregateState state) throws HyracksDataException {
if (!outputPending) {
resultTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
resultTupleBuilder.addField(accessor, tIndex,
- keyFields[i]);
+ keyFieldsInPartialResults[i]);
}
DataOutput dos = resultTupleBuilder.getDataOutput();
@@ -137,9 +137,9 @@
if (!outputPending) {
resultTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
resultTupleBuilder.addField(accessor, tIndex,
- keyFields[i]);
+ keyFieldsInPartialResults[i]);
}
DataOutput dos = resultTupleBuilder.getDataOutput();
@@ -230,7 +230,9 @@
@Override
public void close() {
- // TODO Auto-generated method stub
+ for(int i = 0; i < aggregators.length; i++){
+ aggregators[i].close();
+ }
}
@Override
@@ -275,9 +277,9 @@
AggregateState state) throws HyracksDataException {
if (!initPending) {
stateTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
stateTupleBuilder.addField(accessor, tIndex,
- keyFields[i]);
+ keyFieldsInPartialResults[i]);
}
DataOutput dos = stateTupleBuilder.getDataOutput();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index d89709f..d8df3a8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -64,562 +64,1124 @@
*/
public class AggregationTests extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) });
- final RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ final RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
- private AbstractSingleActivityOperatorDescriptor getPrinter(
- JobSpecification spec, String prefix) throws IOException {
+ private AbstractSingleActivityOperatorDescriptor getPrinter(
+ JobSpecification spec, String prefix) throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
- spec, new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC1_ID, createTempFile()
- .getAbsolutePath()),
- new FileSplit(NC2_ID, createTempFile()
- .getAbsolutePath()) }), "\t");
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+ spec, new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC1_ID, createTempFile()
+ .getAbsolutePath()),
+ new FileSplit(NC2_ID, createTempFile()
+ .getAbsolutePath()) }), "\t");
- return printer;
- }
+ return printer;
+ }
- @Test
- public void singleKeySumInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void singleKeySumInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void singleKeySumPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ @Test
+ public void singleKeySumPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeySumExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyAvgInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }),
+ outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyAvgPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }),
+ outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void multiKeySumInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void multiKeySumPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 8, 0 };
- int[] keyFields = new int[] { 0 };
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec);
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- spec.addRoot(printer);
- runTest(spec);
- }
+ @Test
+ public void multiKeySumExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- @Test
- public void singleKeySumExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ int[] keyFields = new int[] { 8, 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumExtGroupTest");
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumExtGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- spec.addRoot(printer);
- runTest(spec);
- }
+ @Test
+ public void multiKeyAvgInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- @Test
- public void singleKeyAvgInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }),
+ outputRec, tableSize);
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }), outputRec, tableSize);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- spec.addRoot(printer);
- runTest(spec);
- }
-
- @Test
- public void singleKeyAvgPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyAvgPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
+ int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldAggregatorFactory(1, true) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }),
+ outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- @Test
- public void singleKeyAvgExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ int[] keyFields = new int[] { 8, 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldAggregatorFactory(3, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new AvgFieldAggregatorFactory(4, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- @Test
- public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }),
- outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- @Test
- public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
+ int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }),
- outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringPreClusterGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
-
- @Test
- public void singleKeyMinMaxStringExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ int[] keyFields = new int[] { 8, 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true, true) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
}
diff --git a/hyracks-examples/text-example/textclient/pom.xml b/hyracks-examples/text-example/textclient/pom.xml
index 2642d31..c6f3832 100644
--- a/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks-examples/text-example/textclient/pom.xml
@@ -41,6 +41,7 @@
<artifactId>appassembler-maven-plugin</artifactId>
<executions>
<execution>
+ <id>textclient</id>
<configuration>
<programs>
<program>
@@ -56,6 +57,23 @@
<goal>assemble</goal>
</goals>
</execution>
+ <execution>
+ <id>groupclient</id>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.text.client.ExternalGroupClient</mainClass>
+ <name>groupclient</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 68e33cb..61b4ad8 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -42,6 +42,11 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
@@ -101,6 +106,9 @@
@Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
public boolean outPlain = true;
+
+ @Option(name = "-algo", usage = "The algorithm to be used", required = true)
+ public int algo;
}
/**
@@ -111,14 +119,17 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
+ options.port);
JobSpecification job;
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 6; i++) {
long start = System.currentTimeMillis();
- job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i % 2),
- options.htSize, options.sbSize, options.framesLimit, options.sortOutput, i % 3, options.outPlain);
+ job = createJob(parseFileSplits(options.inFileSplits),
+ parseFileSplits(options.outFileSplits, i),
+ options.htSize, options.sbSize, options.framesLimit,
+ options.sortOutput, options.algo, options.outPlain);
System.out.print(i + "\t" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
@@ -136,9 +147,11 @@
String s = splits[i].trim();
int idx = s.indexOf(':');
if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
+ throw new IllegalArgumentException("File split " + s
+ + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+ new File(s.substring(idx + 1))));
}
return fSplits;
}
@@ -150,46 +163,70 @@
String s = splits[i].trim();
int idx = s.indexOf(':');
if (idx < 0) {
- throw new IllegalArgumentException("File split " + s + " not well formed");
+ throw new IllegalArgumentException("File split " + s
+ + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
- + count)));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+ new File(s.substring(idx + 1) + "_" + count)));
}
return fSplits;
}
- private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
- int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
+ private static JobSpecification createJob(FileSplit[] inSplits,
+ FileSplit[] outSplits, int htSize, int sbSize, int framesLimit,
+ boolean sortOutput, int alg, boolean outPlain) {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(
+ inSplits);
- RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor inDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
+ FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(
+ spec, splitsProvider, new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|'),
+ inDesc);
createPartitionConstraint(spec, fileScanner, inSplits);
// Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE,
- // IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE,
+ // IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
// Specify the grouping key, which will be the string extracted during
// the scan.
@@ -200,128 +237,246 @@
AbstractOperatorDescriptor grouper;
switch (alg) {
- case 0: // External hash group
- grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keys,
- framesLimit,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new IntegerNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
- outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
+ case 0: // External hash group
+ grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keys,
+ framesLimit,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
+ keys.length) }),
+ outDesc,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }),
+ htSize), false);
- createPartitionConstraint(spec, grouper, outSplits);
+ createPartitionConstraint(spec, grouper, outSplits);
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
- break;
- case 1: // External sort + pre-cluster
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
- createPartitionConstraint(spec, sorter, inSplits);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConn, fileScanner, 0, grouper, 0);
+ break;
+ case 1: // External sort + pre-cluster
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec, framesLimit, keys, new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+ createPartitionConstraint(spec, sorter, inSplits);
- // Connect scan operator with the sorter
- IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
+ // Connect scan operator with the sorter
+ IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn, fileScanner, 0, sorter, 0);
- grouper = new PreclusteredGroupOperatorDescriptor(spec, keys, new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc);
+ grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keys,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ outDesc);
- createPartitionConstraint(spec, grouper, outSplits);
+ createPartitionConstraint(spec, grouper, outSplits);
- // Connect sorter with the pre-cluster
- OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(sortGroupConn, sorter, 0, grouper, 0);
- break;
- case 2: // In-memory hash group
- grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }), new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
- new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), outDesc,
- htSize);
+ // Connect sorter with the pre-cluster
+ OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(sortGroupConn, sorter, 0, grouper, 0);
+ break;
+ case 2: // In-memory hash group
+ grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keys,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ outDesc, htSize);
- createPartitionConstraint(spec, grouper, outSplits);
+ createPartitionConstraint(spec, grouper, outSplits);
- // Connect scanner with the grouper
- IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanConn, fileScanner, 0, grouper, 0);
- break;
- default:
- grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keys,
- framesLimit,
- new IBinaryComparatorFactory[] {
- // IntegerBinaryComparatorFactory.INSTANCE,
- IntegerBinaryComparatorFactory.INSTANCE },
- new IntegerNormalizedKeyComputerFactory(),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
- new MultiAggregatorDescriptorFactory(
- new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(keys.length) }),
- outDesc, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keys,
- new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }), htSize), false);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanConn = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanConn, fileScanner, 0, grouper, 0);
+ break;
+ case 3: // new external hash graph
+ grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.ExternalGroupOperatorDescriptor(
+ spec,
+ keys,
+ framesLimit,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
+ false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
+ keys.length, false) }), outDesc,
+ new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }),
+ htSize), false);
+
+ createPartitionConstraint(spec, grouper, outSplits);
- createPartitionConstraint(spec, grouper, outSplits);
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
+
+ break;
+ case 4: // External-sort + new-precluster
+ ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(
+ spec, framesLimit, keys, new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
+ createPartitionConstraint(spec, sorter2, inSplits);
- // Connect scanner with the grouper
- IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
- // IntegerBinaryHashFunctionFactory.INSTANCE,
- IntegerBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+ // Connect scan operator with the sorter
+ IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
+
+ grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.PreclusteredGroupOperatorDescriptor(
+ spec,
+ keys,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ outDesc);
+
+ createPartitionConstraint(spec, grouper, outSplits);
+
+ // Connect sorter with the pre-cluster
+ OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
+ break;
+ case 5: // Inmem
+ grouper = new edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor(
+ spec,
+ keys,
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+ outDesc, htSize);
+
+ createPartitionConstraint(spec, grouper, outSplits);
+
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanConn2, fileScanner, 0, grouper, 0);
+ break;
+ default:
+ grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keys,
+ framesLimit,
+ new IBinaryComparatorFactory[] {
+ // IntegerBinaryComparatorFactory.INSTANCE,
+ IntegerBinaryComparatorFactory.INSTANCE },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new CountAggregatorDescriptorFactory() }),
+ new MultiAggregatorDescriptorFactory(
+ new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(
+ keys.length) }),
+ outDesc,
+ new HashSpillableGroupingTableFactory(
+ new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }),
+ htSize), false);
+
+ createPartitionConstraint(spec, grouper, outSplits);
+
+ // Connect scanner with the grouper
+ IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keys,
+ new IBinaryHashFunctionFactory[] {
+ // IntegerBinaryHashFunctionFactory.INSTANCE,
+ IntegerBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
}
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
+ outSplits);
AbstractSingleActivityOperatorDescriptor writer;
if (outPlain)
- writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+ writer = new PlainFileWriterOperatorDescriptor(spec,
+ outSplitProvider, "|");
else
- writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ writer = new FrameFileWriterOperatorDescriptor(spec,
+ outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
- IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+ IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(
+ spec);
spec.connect(groupOutConn, grouper, 0, writer, 0);
spec.addRoot(writer);
return spec;
}
- private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ private static void createPartitionConstraint(JobSpecification spec,
+ IOperatorDescriptor op, FileSplit[] splits) {
String[] parts = new String[splits.length];
for (int i = 0; i < splits.length; ++i) {
parts[i] = splits[i].getNodeName();
}
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+ PartitionConstraintHelper
+ .addAbsoluteLocationConstraint(spec, op, parts);
}
}
\ No newline at end of file