Fixed a bug on the merging phase of the external grouper; added a float sum aggregator (with test cases).
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1087 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 41da49a..1cab78d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -401,7 +401,6 @@
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
topTuples);
} else {
- closeRun(runIndex, runFileReaders, tupleAccessors);
break;
}
}
@@ -565,22 +564,20 @@
* If all tuples in the targeting frame have been
* checked.
*/
- int frameOffset = runIndex * runFrameLimit;
tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = frameOffset;
+ currentFrameIndexInRun[runIndex] = runStart;
/**
* read in batch
*/
currentRunFrames[runIndex] = 0;
- for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
- ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
- if (runCursors[runIndex].nextFrame(buffer)) {
- tupleAccessors[frameOffset].reset(buffer);
- if (tupleAccessors[frameOffset].getTupleCount() > 0) {
- existNext = true;
- } else {
- throw new IllegalStateException("illegal: empty run file");
- }
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex]
+ + j;
+ if (runCursors[runIndex].nextFrame(inFrames
+ .get(frameIndex))) {
+ tupleAccessors[frameIndex].reset(inFrames
+ .get(frameIndex));
+ existNext = true;
currentRunFrames[runIndex]++;
} else {
break;
@@ -611,7 +608,10 @@
if (runCursors[index] != null) {
runCursors[index].close();
runCursors[index] = null;
- tupleAccessor[index] = null;
+ int frameOffset = index * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ tupleAccessor[frameOffset + j] = null;
+ }
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
new file mode 100644
index 0000000..f8b1d74
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class FloatSumFieldAggregatorFactory implements
+ IFieldAggregateDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int aggField;
+
+ private final boolean useObjectState;
+
+ public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState){
+ this.aggField = aggField;
+ this.useObjectState = useObjState;
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ */
+ @Override
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+ return new IFieldAggregateDescriptor() {
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data,
+ int offset, AggregateState state) throws HyracksDataException {
+ float sum;
+ if (!useObjectState) {
+ sum = FloatSerializerDeserializer.getFloat(data, offset);
+ } else {
+ sum = (Float) state.state;
+ }
+ try {
+ fieldOutput.writeFloat(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+ int offset, AggregateState state) throws HyracksDataException {
+ float sum;
+ if (!useObjectState) {
+ sum = FloatSerializerDeserializer.getFloat(data, offset);
+ } else {
+ sum = (Float) state.state;
+ }
+ try {
+ fieldOutput.writeFloat(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public boolean needsObjectState() {
+ return useObjectState;
+ }
+
+ @Override
+ public boolean needsBinaryState() {
+ return !useObjectState;
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException {
+ float sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+
+ sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+ if (!useObjectState) {
+ try {
+ fieldOutput.writeFloat(sum);
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ } else {
+ state.state = sum;
+ }
+ }
+
+ @Override
+ public AggregateState createState() {
+ return new AggregateState(new Float(0.0));
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
+ float sum = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += FloatSerializerDeserializer.getFloat(accessor.getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+
+ if (!useObjectState) {
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ sum += buf.getFloat(offset);
+ buf.putFloat(offset, sum);
+ } else {
+ sum += (Float) state.state;
+ state.state = sum;
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 2813a62..53a3f79 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -57,6 +57,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
@@ -66,34 +67,54 @@
*/
public class AggregationTest extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
- new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) });
- final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ final RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
- private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
- throws IOException {
+ private AbstractSingleActivityOperatorDescriptor getPrinter(
+ JobSpecification spec, String prefix) throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
- new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
- new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+ spec, new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC1_ID, createTempFile()
+ .getAbsolutePath()),
+ new FileSplit(NC2_ID, createTempFile()
+ .getAbsolutePath()) }), "\t");
return printer;
}
@@ -102,38 +123,54 @@
public void singleKeySumInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }),
outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -146,34 +183,49 @@
public void singleKeySumPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE});
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true)}),
outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -186,41 +238,64 @@
public void singleKeySumExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE});
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false)}),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false)}),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -233,38 +308,54 @@
public void singleKeyAvgInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -277,34 +368,49 @@
public void singleKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -317,43 +423,64 @@
public void singleKeyAvgExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ new AvgFieldMergeAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -366,38 +493,52 @@
public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -410,34 +551,47 @@
public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -450,42 +604,63 @@
public void singleKeyMinMaxStringExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }), tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields,
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -498,39 +673,58 @@
public void multiKeySumInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec, keyFields, new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -543,35 +737,51 @@
public void multiKeySumPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -584,43 +794,69 @@
public void multiKeySumExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -633,40 +869,60 @@
public void multiKeyAvgInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec, keyFields, new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -679,36 +935,53 @@
public void multiKeyAvgPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -721,46 +994,72 @@
public void multiKeyAvgExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
new IntSumFieldAggregatorFactory(3, false),
- new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ new AvgFieldMergeAggregatorFactory(4, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -773,39 +1072,58 @@
public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec, keyFields, new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -818,35 +1136,51 @@
public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringPreClusterGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -859,44 +1193,71 @@
public void multiKeyMinMaxStringExtGroupTest() throws Exception {
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
- desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
int frameLimits = 4;
int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15, true, true) }),
- new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
- new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
-
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);