Updated the aggregator interfaces to use ArrayTupleBuilder; removed the state length calculator.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@977 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 7647e50..56ba785 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -59,6 +59,25 @@
}
return false;
}
+
+ public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length){
+ if (tupleDataEndOffset + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ int effectiveSlots = 0;
+ for (int i = 0; i < fieldSlots.length; ++i) {
+ if(fieldSlots[i] > 0){
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+ effectiveSlots++;
+ }
+ }
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
+ tupleDataEndOffset += effectiveSlots * 4 + length;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
int length = tEndOffset - tStartOffset;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index a0a2dd2..2edbf97 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -39,7 +39,9 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
@@ -293,6 +295,9 @@
storedKeys[i] = i;
}
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+ recordDescriptors[0].getFields().length);
+
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
/**
* Input frames, one for each run file.
@@ -303,12 +308,16 @@
* Output frame.
*/
private ByteBuffer outFrame, writerFrame;
- private int outFrameOffset, writerFrameOffset;
+ private final FrameTupleAppender outAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ private FrameTupleAppender writerAppender;
private LinkedList<RunFileReader> runs;
private AggregateActivityState aggState;
+ private ArrayTupleBuilder finalTupleBuilder;
+
/**
* how many frames to be read ahead once
*/
@@ -339,7 +348,7 @@
runs = new LinkedList<RunFileReader>(runs);
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
- outFrameOffset = 0;
+ outAppender.reset(outFrame, true);
outFrameAccessor.reset(outFrame);
while (runs.size() > 0) {
try {
@@ -456,31 +465,25 @@
* tuple builder
*/
- int stateLength = aggregator
- .getBinaryAggregateStateLength(fta,
- tupleIndex, aggregateState);
+ tupleBuilder.reset();
- if (FrameToolsForGroupers.isFrameOverflowing(
- outFrame, stateLength, (outFrameOffset == 0))) {
+ aggregator.init(tupleBuilder, fta, tupleIndex,
+ aggregateState);
+
+ if (!outAppender.appendSkipEmptyField(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
flushOutFrame(writer, finalPass);
- if (FrameToolsForGroupers
- .isFrameOverflowing(outFrame,
- stateLength, (outFrameOffset == 0))) {
+ if (!outAppender.appendSkipEmptyField(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
throw new HyracksDataException(
"The partial result is too large to be initialized in a frame.");
}
}
- aggregator.init(outFrame.array(),
- outFrameOffset, fta, tupleIndex,
- aggregateState);
-
- FrameToolsForGroupers
- .updateFrameMetaForNewTuple(outFrame,
- stateLength, (outFrameOffset == 0));
-
- outFrameOffset += stateLength;
-
} else {
/**
* if new tuple is in the same group of the
@@ -498,8 +501,9 @@
runFileReaders, tupleAccessors, topTuples);
}
- if (outFrameOffset > 0) {
+ if (outAppender.getTupleCount() > 0) {
flushOutFrame(writer, finalPass);
+ outAppender.reset(outFrame, true);
}
aggregator.close();
@@ -521,68 +525,56 @@
private void flushOutFrame(IFrameWriter writer, boolean isFinal)
throws HyracksDataException {
+
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(
+ recordDescriptors[0].getFields().length);
+ }
+
if (writerFrame == null) {
writerFrame = ctx.allocateFrame();
- writerFrameOffset = 0;
+ }
+
+ if (writerAppender == null) {
+ writerAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ writerAppender.reset(writerFrame, true);
}
outFrameAccessor.reset(outFrame);
for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
- int outputLen;
+ finalTupleBuilder.reset();
if (isFinal) {
- outputLen = aggregator.getFinalOutputLength(
- outFrameAccessor, i, aggregateState);
-
- if (FrameToolsForGroupers.isFrameOverflowing(
- writerFrame, outputLen, (writerFrameOffset == 0))) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(
- writerFrame, outputLen, (writerFrameOffset == 0))) {
- throw new HyracksDataException(
- "Final aggregation output is too large to be fit into a frame.");
- }
- }
-
- aggregator.outputFinalResult(writerFrame.array(),
- writerFrameOffset, outFrameAccessor, i,
+ aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i,
aggregateState);
+
} else {
- outputLen = aggregator.getPartialOutputLength(
- outFrameAccessor, i, aggregateState);
-
- if (FrameToolsForGroupers.isFrameOverflowing(
- writerFrame, outputLen, (writerFrameOffset == 0))) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(
- writerFrame, outputLen, (writerFrameOffset == 0))) {
- throw new HyracksDataException(
- "Final aggregation output is too large to be fit into a frame.");
- }
- }
-
- aggregator.outputPartialResult(writerFrame.array(),
- writerFrameOffset, outFrameAccessor, i,
+ aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i,
aggregateState);
}
-
- FrameToolsForGroupers.updateFrameMetaForNewTuple(
- writerFrame, outputLen, (writerFrameOffset == 0));
-
- writerFrameOffset += outputLen;
+
+
+ if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "Aggregation output is too large to be fit into a frame.");
+ }
+ }
}
- if (writerFrameOffset > 0) {
+ if (writerAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(writerFrame, writer);
- writerFrameOffset = 0;
+ writerAppender.reset(writerFrame, true);
}
- outFrameOffset = 0;
+
+ outAppender.reset(outFrame, true);
}
private void setNextTopTuple(int runIndex, int[] tupleIndices,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index 853ae36..f4cfe98 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -28,7 +28,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
class GroupingHashTable {
@@ -78,11 +80,12 @@
private final ITuplePartitionComputer tpc;
private final IAggregatorDescriptor aggregator;
- private int bufferOffset;
- private int tupleCountInBuffer;
+ private final FrameTupleAppender appender;
private final FrameTupleAccessor storedKeysAccessor;
+ private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
+
GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
IBinaryComparatorFactory[] comparatorFactories,
ITuplePartitionComputerFactory tpcf,
@@ -127,7 +130,20 @@
storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
storedKeysRecordDescriptor);
lastBIndex = -1;
+
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+
addNewBuffer();
+
+ if (fields.length < outRecordDescriptor.getFields().length) {
+ stateTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+ } else {
+ stateTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length + 1);
+ }
+ outputTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
}
private void addNewBuffer() {
@@ -135,8 +151,7 @@
buffer.position(0);
buffer.limit(buffer.capacity());
buffers.add(buffer);
- bufferOffset = 0;
- tupleCountInBuffer = 0;
+ appender.reset(buffer, true);
++lastBIndex;
}
@@ -165,29 +180,22 @@
// Add aggregation fields
AggregateState newState = aggregator.createAggregateStates();
- int initLength = aggregator.getBinaryAggregateStateLength(accessor,
- tIndex, newState);
+ stateTupleBuilder.reset();
- if (FrameToolsForGroupers.isFrameOverflowing(
- buffers.get(lastBIndex), initLength, (bufferOffset == 0))) {
+ aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
+
+ if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
addNewBuffer();
- if (bufferOffset + initLength > ctx.getFrameSize()) {
+ if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
throw new HyracksDataException(
- "Cannot initialize the aggregation state within a frame.");
+ "Cannot init the aggregate state in a single frame.");
}
}
-
- aggregator.init(buffers.get(lastBIndex).array(), bufferOffset,
- accessor, tIndex, newState);
-
- FrameToolsForGroupers.updateFrameMetaForNewTuple(
- buffers.get(lastBIndex), initLength, (bufferOffset == 0));
-
- bufferOffset += initLength;
-
- // Update tuple count in frame
- tupleCountInBuffer++;
-
+
if (accumulatorSize >= aggregateStates.length) {
aggregateStates = Arrays.copyOf(aggregateStates,
aggregateStates.length * 2);
@@ -195,7 +203,7 @@
aggregateStates[saIndex] = newState;
- link.add(lastBIndex, tupleCountInBuffer - 1, saIndex);
+ link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
} else {
aggregator.aggregate(accessor, tIndex, null, 0,
@@ -205,7 +213,7 @@
void write(IFrameWriter writer) throws HyracksDataException {
ByteBuffer buffer = ctx.allocateFrame();
- int bufOffset = 0;
+ appender.reset(buffer, true);
for (int i = 0; i < table.length; ++i) {
Link link = table[i];
@@ -216,38 +224,27 @@
int aIndex = link.pointers[j + 2];
ByteBuffer keyBuffer = buffers.get(bIndex);
storedKeysAccessor.reset(keyBuffer);
-
- int outputLen = aggregator
- .getFinalOutputLength(storedKeysAccessor, tIndex,
- aggregateStates[aIndex]);
-
- if (FrameToolsForGroupers.isFrameOverflowing(buffer,
- outputLen, (bufOffset == 0))) {
- writer.nextFrame(buffer);
- bufOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(buffer,
- outputLen, (bufOffset == 0))) {
- throw new HyracksDataException(
- "Cannot write aggregation output in a frame.");
- }
- }
+
+ outputTupleBuilder.reset();
aggregator
- .outputFinalResult(buffer.array(), bufOffset,
+ .outputFinalResult(outputTupleBuilder,
storedKeysAccessor, tIndex,
aggregateStates[aIndex]);
-
- FrameToolsForGroupers.updateFrameMetaForNewTuple(buffer,
- outputLen, (bufOffset == 0));
-
- bufOffset += outputLen;
+
+ if(!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())){
+ writer.nextFrame(buffer);
+ appender.reset(buffer, true);
+ if(!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot write the aggregation output into a frame.");
+ }
+ }
}
}
}
- if (bufOffset != 0) {
+ if (appender.getTupleCount() != 0) {
writer.nextFrame(buffer);
- bufOffset = 0;
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 5cff298..6bd053f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -30,7 +30,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -113,6 +115,7 @@
final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
keyFields, storedKeys, comparators);
+
final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
storedKeys, storedKeys, comparators);
@@ -133,13 +136,27 @@
final AggregateState aggregateState = aggregator
.createAggregateStates();
+ final ArrayTupleBuilder stateTupleBuilder;
+ if (keyFields.length < outRecordDescriptor.getFields().length) {
+ stateTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+ } else {
+ stateTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length + 1);
+ }
+
+ final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+
return new ISpillableTable() {
private int lastBufIndex;
- private int outFrameOffset;
- private int tupleCountInOutFrame;
private ByteBuffer outputFrame;
+ private FrameTupleAppender outputAppender;
+
+ private FrameTupleAppender stateAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
private final ISerializableTable table = new SerializableHashTable(
tableSize, ctx);
@@ -229,27 +246,29 @@
if (!foundGroup) {
- int initLen = aggregator.getBinaryAggregateStateLength(
- accessor, tIndex, aggregateState);
+ stateTupleBuilder.reset();
- if (FrameToolsForGroupers.isFrameOverflowing(
- frames.get(lastBufIndex), initLen, (outFrameOffset == 0))) {
+ aggregator.init(stateTupleBuilder, accessor, tIndex,
+ aggregateState);
+ if (!stateAppender.appendSkipEmptyField(
+ stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
if (!nextAvailableFrame()) {
return false;
}
+ if (!stateAppender.appendSkipEmptyField(
+ stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "Cannot init external aggregate state in a frame.");
+ }
}
- aggregator.init(frames.get(lastBufIndex).array(),
- outFrameOffset, accessor, tIndex, aggregateState);
-
- FrameToolsForGroupers.updateFrameMetaForNewTuple(
- frames.get(lastBufIndex), initLen, (outFrameOffset == 0));
-
- outFrameOffset += initLen;
- tupleCountInOutFrame++;
-
storedTuplePointer.frameIndex = lastBufIndex;
- storedTuplePointer.tupleIndex = tupleCountInOutFrame - 1;
+ storedTuplePointer.tupleIndex = stateAppender
+ .getTupleCount() - 1;
table.insert(entry, storedTuplePointer);
} else {
@@ -277,7 +296,13 @@
outputFrame = ctx.allocateFrame();
}
- int outputFrameOffset = 0;
+ if (outputAppender == null) {
+ outputAppender = new FrameTupleAppender(
+ outputFrame.capacity());
+ }
+
+ outputAppender.reset(outputFrame, true);
+
writer.open();
if (tPointers == null) {
@@ -295,64 +320,46 @@
storedKeysAccessor1.reset(frames.get(bIndex));
- int outputLen;
+ outputTupleBuilder.reset();
if (isPartial) {
- outputLen = aggregator.getPartialOutputLength(
- storedKeysAccessor1, tIndex,
- aggregateState);
-
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
- if (FrameToolsForGroupers
- .isFrameOverflowing(outputFrame,
- outputLen, (outputFrameOffset == 0))) {
- throw new HyracksDataException(
- "The output item is too large to be fit into a frame.");
- }
- }
-
aggregator.outputPartialResult(
- outputFrame.array(), outputFrameOffset,
+ outputTupleBuilder,
storedKeysAccessor1, tIndex,
aggregateState);
} else {
- outputLen = aggregator.getFinalOutputLength(
- storedKeysAccessor1, tIndex,
- aggregateState);
-
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
- if (FrameToolsForGroupers
- .isFrameOverflowing(outputFrame,
- outputLen, (outputFrameOffset == 0))) {
- throw new HyracksDataException(
- "The output item is too large to be fit into a frame.");
- }
- }
aggregator.outputFinalResult(
- outputFrame.array(), outputFrameOffset,
+ outputTupleBuilder,
storedKeysAccessor1, tIndex,
aggregateState);
}
- FrameToolsForGroupers.updateFrameMetaForNewTuple(
- outputFrame, outputLen, (outputFrameOffset == 0));
-
- outputFrameOffset += outputLen;
+ if (!outputAppender.appendSkipEmptyField(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputAppender.reset(outputFrame, true);
+ if (!outputAppender
+ .appendSkipEmptyField(
+ outputTupleBuilder
+ .getFieldEndOffsets(),
+ outputTupleBuilder
+ .getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "The output item is too large to be fit into a frame.");
+ }
+ }
} while (true);
}
- if (outputFrameOffset > 0) {
+ if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
+ outputAppender.reset(outputFrame, true);
}
aggregator.close();
return;
@@ -369,58 +376,41 @@
ByteBuffer buffer = frames.get(frameIndex);
storedKeysAccessor1.reset(buffer);
- int outputLen;
+ outputTupleBuilder.reset();
if (isPartial) {
- outputLen = aggregator
- .getPartialOutputLength(storedKeysAccessor1,
- tupleIndex, aggregateState);
-
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- throw new HyracksDataException(
- "The output item is too large to be fit into a frame.");
- }
- }
-
- aggregator.outputPartialResult(outputFrame.array(),
- outputFrameOffset, storedKeysAccessor1,
- tupleIndex, aggregateState);
+ aggregator
+ .outputPartialResult(outputTupleBuilder,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState);
} else {
- outputLen = aggregator
- .getFinalOutputLength(storedKeysAccessor1,
- tupleIndex, aggregateState);
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(
- outputFrame, outputLen, (outputFrameOffset == 0))) {
- throw new HyracksDataException(
- "The output item is too large to be fit into a frame.");
- }
- }
-
- aggregator.outputFinalResult(outputFrame.array(),
- outputFrameOffset, storedKeysAccessor1,
- tupleIndex, aggregateState);
+ aggregator
+ .outputFinalResult(outputTupleBuilder,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState);
}
- FrameToolsForGroupers.updateFrameMetaForNewTuple(
- outputFrame, outputLen, (outputFrameOffset == 0));
-
- outputFrameOffset += outputLen;
+ if (!outputAppender.appendSkipEmptyField(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputAppender.reset(outputFrame, true);
+ if (!outputAppender.appendSkipEmptyField(
+ outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "The output item is too large to be fit into a frame.");
+ }
+ }
}
- if (outputFrameOffset > 0) {
+ if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputFrame, writer);
- outputFrameOffset = 0;
+ outputAppender.reset(outputFrame, true);
}
aggregator.close();
}
@@ -453,8 +443,7 @@
frame.position(0);
frame.limit(frame.capacity());
frames.add(frame);
- outFrameOffset = 0;
- tupleCountInOutFrame = 0;
+ stateAppender.reset(frame, true);
lastBufIndex = frames.size() - 1;
} else {
// Reuse an old frame
@@ -462,8 +451,7 @@
ByteBuffer frame = frames.get(lastBufIndex);
frame.position(0);
frame.limit(frame.capacity());
- outFrameOffset = 0;
- tupleCountInOutFrame = 0;
+ stateAppender.reset(frame, true);
}
return true;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 358bf42..329eb4b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
/**
*
@@ -30,20 +31,6 @@
public AggregateState createAggregateStates();
/**
- * Get the length of the binary states.
- *
- * @return
- */
- public int getBinaryAggregateStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) throws HyracksDataException;
-
- public int getPartialOutputLength(IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException;
-
- public int getFinalOutputLength(IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException;
-
- /**
* Initialize the state based on the input tuple.
*
* @param accessor
@@ -55,7 +42,7 @@
* The state to be initialized.
* @throws HyracksDataException
*/
- public void init(byte[] buf, int offset, IFrameTupleAccessor accessor,
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
int tIndex, AggregateState state) throws HyracksDataException;
/**
@@ -98,7 +85,7 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputPartialResult(byte[] data, int offset,
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
@@ -114,7 +101,7 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public void outputFinalResult(byte[] data, int offset,
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 1471318..32f4365 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -112,15 +112,6 @@
public boolean needsObjectState();
- public int getBinaryStateLength(IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException;
-
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException;
-
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException;
-
public AggregateState createState();
/**
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 8ff4942..17ad5f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -76,7 +76,7 @@
@Override
public void open() throws HyracksDataException {
pgw = new PreclusteredGroupWriter(ctx, groupFields,
- comparators, aggregator, inRecordDesc, writer);
+ comparators, aggregator, inRecordDesc, recordDescriptors[0], writer);
pgw.open();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 17f62f0..226642f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -21,7 +21,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class PreclusteredGroupWriter implements IFrameWriter {
@@ -35,13 +37,14 @@
private final FrameTupleAccessor copyFrameAccessor;
private final ByteBuffer outFrame;
- private int outFrameOffset;
+ private final FrameTupleAppender appender;
+ private final ArrayTupleBuilder tupleBuilder;
private boolean first;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
- RecordDescriptor inRecordDesc, IFrameWriter writer) {
+ RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc, IFrameWriter writer) {
this.groupFields = groupFields;
this.comparators = comparators;
this.aggregator = aggregator;
@@ -55,7 +58,10 @@
copyFrameAccessor.reset(copyFrame);
outFrame = ctx.allocateFrame();
- outFrameOffset = 0;
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+
+ tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
}
@Override
@@ -71,7 +77,9 @@
for (int i = 0; i < nTuples; ++i) {
if (first) {
- aggregator.init(null, 0, inFrameAccessor, i, aggregateState);
+ tupleBuilder.reset();
+
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
first = false;
@@ -97,7 +105,8 @@
currTupleIndex)) {
writeOutput(prevTupleAccessor, prevTupleIndex);
- aggregator.init(null, 0, currTupleAccessor, currTupleIndex,
+ tupleBuilder.reset();
+ aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex,
aggregateState);
} else {
aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
@@ -108,26 +117,20 @@
private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
int lastTupleIndex) throws HyracksDataException {
- int outLen = aggregator.getFinalOutputLength(lastTupleAccessor,
- lastTupleIndex, aggregateState);
+ tupleBuilder.reset();
- if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+ aggregator.outputFinalResult(tupleBuilder,
+ lastTupleAccessor, lastTupleIndex, aggregateState);
+
+ if(!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
- outFrameOffset = 0;
- if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+ appender.reset(outFrame, true);
+ if(!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
throw new HyracksDataException(
"The output cannot be fit into a frame.");
}
}
- aggregator.outputFinalResult(outFrame.array(), outFrameOffset,
- lastTupleAccessor, lastTupleIndex, aggregateState);
-
- FrameToolsForGroupers.updateFrameMetaForNewTuple(outFrame, outLen,
- (outFrameOffset == 0));
-
- outFrameOffset += outLen;
-
}
private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
@@ -158,7 +161,7 @@
if (!first) {
writeOutput(copyFrameAccessor,
copyFrameAccessor.getTupleCount() - 1);
- if (outFrameOffset > 0) {
+ if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index c140a49..66eed9d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -171,29 +171,6 @@
return new AggregateState(new Integer[]{0, 0});
}
- @Override
- public int getBinaryStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state)
- throws HyracksDataException {
- if(useObjectState){
- return 0;
- } else {
- return 8;
- }
- }
-
- @Override
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- return 8;
- }
-
- @Override
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- return 4;
- }
-
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index c6d4e99..cc30641 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -140,29 +140,6 @@
public boolean needsBinaryState() {
return !useObjectState;
}
-
- @Override
- public int getBinaryStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state)
- throws HyracksDataException {
- if(useObjectState){
- return 0;
- } else {
- return 8;
- }
- }
-
- @Override
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- return 8;
- }
-
- @Override
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- return 4;
- }
@Override
public AggregateState createState() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index e36a168..e320603 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -144,26 +144,6 @@
state.state = count;
}
}
-
- @Override
- public int getBinaryStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) {
- if(useObjectState)
- return 0;
- return 4;
- }
-
- @Override
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) {
- return 4;
- }
-
- @Override
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) {
- return 4;
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 270ada0..a3c0aa9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -164,26 +164,6 @@
state.state = sum;
}
}
-
- @Override
- public int getBinaryStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) {
- if (useObjectState)
- return 0;
- return 4;
- }
-
- @Override
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) {
- return 4;
- }
-
- @Override
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) {
- return 4;
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index b8656da..9c6ff68 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -215,80 +215,6 @@
return new AggregateState();
}
- @Override
- public int getBinaryStateLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state) {
- int len = 0;
-
- if (hasBinaryState) {
- len = 4;
- }
-
- return len;
- }
-
- @Override
- public int getPartialResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- int len = 0;
-
- if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(data,
- offset);
-
- Object[] storedState = (Object[]) state.state;
-
- len = getUTFLength(((String) (storedState[stateIdx])));
- } else {
- len = getUTFLength((String) (state.state));
- }
-
- return len;
- }
-
- @Override
- public int getFinalResultLength(byte[] data, int offset,
- AggregateState state) throws HyracksDataException {
- int len = 0;
-
- if (hasBinaryState) {
- int stateIdx = IntegerSerializerDeserializer.getInt(data,
- offset);
-
- Object[] storedState = (Object[]) state.state;
-
- len = getUTFLength(((String) (storedState[stateIdx])));
- } else {
- len = getUTFLength((String) (state.state));
- }
-
- return len;
- }
-
- private int getUTFLength(String str) throws HyracksDataException {
- int utflen = 0;
- int c = 0;
-
- /* use charAt instead of copying String to char array */
- for (int i = 0; i < str.length(); i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
-
- if (utflen > 65535) {
- throw new HyracksDataException("encoded string too long: "
- + utflen + " bytes");
- }
-
- return utflen + 2;
- }
-
};
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index c1d67fe..81eb45f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.FrameToolsForGroupers;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
@@ -74,18 +73,6 @@
this.keys = keyFields;
}
- int stateTupleFieldCount = keys.length;
- for (int i = 0; i < aggregators.length; i++) {
- if (aggregators[i].needsBinaryState()) {
- stateTupleFieldCount++;
- }
- }
-
- final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(
- stateTupleFieldCount);
-
- final ArrayTupleBuilder resultTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
return new IAggregatorDescriptor() {
@@ -97,16 +84,16 @@
}
@Override
- public void outputPartialResult(byte[] buf, int offset,
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- resultTupleBuilder.reset();
+ tupleBuilder.reset();
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- resultTupleBuilder.addField(accessor, tIndex,
+ tupleBuilder.addField(accessor, tIndex,
keyFieldsInPartialResults[i]);
}
- DataOutput dos = resultTupleBuilder.getDataOutput();
+ DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
@@ -117,29 +104,24 @@
fieldOffset + accessor.getFieldSlotsLength()
+ tupleOffset, ((AggregateState[]) state
.state)[i]);
- resultTupleBuilder.addFieldEndOffset();
+ tupleBuilder.addFieldEndOffset();
}
- if (buf != null)
- FrameToolsForGroupers.writeFields(buf, offset, this
- .getPartialOutputLength(accessor, tIndex, state),
- resultTupleBuilder);
-
}
@Override
- public void outputFinalResult(byte[] buf, int offset,
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- resultTupleBuilder.reset();
+ tupleBuilder.reset();
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- resultTupleBuilder.addField(accessor, tIndex,
+ tupleBuilder.addField(accessor, tIndex,
keyFieldsInPartialResults[i]);
}
- DataOutput dos = resultTupleBuilder.getDataOutput();
+ DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
@@ -155,99 +137,28 @@
aggregators[i].outputFinalResult(dos, null, 0,
((AggregateState[]) state.state)[i]);
}
- resultTupleBuilder.addFieldEndOffset();
+ tupleBuilder.addFieldEndOffset();
}
-
- if (buf != null)
- FrameToolsForGroupers.writeFields(buf, offset,
- this.getFinalOutputLength(accessor, tIndex, state),
- resultTupleBuilder);
}
@Override
- public int getPartialOutputLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state)
- throws HyracksDataException {
- int stateLength = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
-
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- stateLength += accessor.getFieldLength(tIndex,
- keyFieldsInPartialResults[i]);
- // add length for slot offset
- stateLength += 4;
- }
-
- for (int i = 0; i < aggregators.length; i++) {
- int fieldOffset = 0;
- if (aggregators[i].needsBinaryState()) {
- fieldOffset = accessor.getFieldStartOffset(tIndex,
- keys.length + i);
- }
- stateLength += aggregators[i].getPartialResultLength(
- accessor.getBuffer().array(), tupleOffset
- + accessor.getFieldSlotsLength()
- + fieldOffset,
- ((AggregateState[]) state.state)[i]);
- // add length for slot offset
- stateLength += 4;
- }
- return stateLength;
- }
-
- @Override
- public int getFinalOutputLength(IFrameTupleAccessor accessor,
- int tIndex, AggregateState state)
- throws HyracksDataException {
- int stateLength = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
-
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- stateLength += accessor.getFieldLength(tIndex,
- keyFieldsInPartialResults[i]);
- // add length for slot offset
- stateLength += 4;
- }
-
- for (int i = 0; i < aggregators.length; i++) {
- int fieldOffset = 0;
- if (aggregators[i].needsBinaryState()) {
- fieldOffset = accessor.getFieldStartOffset(tIndex,
- keys.length + i);
- }
- stateLength += aggregators[i].getFinalResultLength(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldOffset, ((AggregateState[]) state
- .state)[i]);
- // add length for slot offset
- stateLength += 4;
- }
- return stateLength;
- }
-
- @Override
- public void init(byte[] buf, int offset,
+ public void init(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
-
- stateTupleBuilder.reset();
+
+ tupleBuilder.reset();
for (int i = 0; i < keys.length; i++) {
- stateTupleBuilder.addField(accessor, tIndex, keys[i]);
+ tupleBuilder.addField(accessor, tIndex, keys[i]);
}
- DataOutput dos = stateTupleBuilder.getDataOutput();
+ DataOutput dos = tupleBuilder.getDataOutput();
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(accessor, tIndex, dos,
((AggregateState[]) state.state)[i]);
if (aggregators[i].needsBinaryState()) {
- stateTupleBuilder.addFieldEndOffset();
+ tupleBuilder.addFieldEndOffset();
}
}
- if (buf != null)
- FrameToolsForGroupers.writeFields(buf, offset, this
- .getBinaryAggregateStateLength(accessor, tIndex,
- state), stateTupleBuilder);
}
@Override
@@ -260,31 +171,6 @@
}
@Override
- public int getBinaryAggregateStateLength(
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- int stateLength = 0;
-
- for (int i = 0; i < keys.length; i++) {
- stateLength += accessor
- .getFieldLength(tIndex, keys[i]);
- // add length for slot offset
- stateLength += 4;
- }
-
- for (int i = 0; i < aggregators.length; i++) {
- if (aggregators[i].needsBinaryState()) {
- stateLength += aggregators[i].getBinaryStateLength(
- accessor, tIndex,
- ((AggregateState[]) state.state)[i]);
- // add length for slot offset
- stateLength += 4;
- }
- }
- return stateLength;
- }
-
- @Override
public void close() {
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].close();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 717cea5..042c919 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -59,1131 +59,1252 @@
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
/**
*
*/
public class AggregationTests extends AbstractIntegrationTest {
- final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) });
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/lineitem.tbl"))) });
- final RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ final RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE,
- IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE, }, '|');
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+ new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
- private AbstractSingleActivityOperatorDescriptor getPrinter(
- JobSpecification spec, String prefix) throws IOException {
+ private AbstractSingleActivityOperatorDescriptor getPrinter(
+ JobSpecification spec, String prefix) throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
- spec, new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit(NC1_ID, createTempFile()
- .getAbsolutePath()),
- new FileSplit(NC2_ID, createTempFile()
- .getAbsolutePath()) }), "\t");
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+ spec, new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC1_ID, createTempFile()
+ .getAbsolutePath()),
+ new FileSplit(NC2_ID, createTempFile()
+ .getAbsolutePath()) }), "\t");
- return printer;
- }
+ return printer;
+ }
- @Test
- public void singleKeySumInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void singleKeySumInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec, tableSize);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- @Test
- public void singleKeySumPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void singleKeySumPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 0 };
+ int[] keyFields = new int[] { 0 };
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ desc);
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumInmemGroupTest");
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeySumExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ @Test
+ public void singleKeySumExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeySumExtGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeySumExtGroupTest");
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeyAvgInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ @Test
+ public void singleKeyAvgInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec, tableSize);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec, tableSize);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeyAvgPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ @Test
+ public void singleKeyAvgPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- int[] keyFields = new int[] { 0 };
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ desc);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeyAvgExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(2, false),
- new AvgFieldMergeAggregatorFactory(3, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ @Test
+ public void singleKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldMergeAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- int[] keyFields = new int[] { 0 };
- int tableSize = 8;
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec, tableSize);
+ @Test
+ public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- @Test
- public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- int[] keyFields = new int[] { 0 };
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec);
+ @Test
+ public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgInmemGroupTest");
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ int[] keyFields = new int[] { 0 };
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ desc);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- spec.addRoot(printer);
- runTest(spec);
- }
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- @Test
- public void singleKeyMinMaxStringExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgInmemGroupTest");
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15,
- true, true) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2, true,
- true) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
+
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ @Test
+ public void multiKeySumInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "singleKeyAvgExtGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
- spec.addRoot(printer);
- runTest(spec);
- }
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec, tableSize);
- @Test
- public void multiKeySumInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec, tableSize);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ @Test
+ public void multiKeySumPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumInmemGroupTest");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ int[] keyFields = new int[] { 8, 0 };
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE},
+ desc);
- spec.addRoot(printer);
- runTest(spec);
- }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- @Test
- public void multiKeySumPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- int[] keyFields = new int[] { 8, 0 };
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumInmemGroupTest");
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields, new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true) }),
- outputRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumInmemGroupTest");
+ @Test
+ public void multiKeySumExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- spec.addRoot(printer);
- runTest(spec);
- }
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- @Test
- public void multiKeySumExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ int[] keyFields = new int[] { 8, 0 };
+ int[] keys = new int[] { 0, 1 };
+ int frameLimits = 4;
+ int tableSize = 8;
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(keyFields,
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory(keys,
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- int[] keyFields = new int[] { 8, 0 };
- int[] keys = new int[] {0, 1};
- int frameLimits = 4;
- int tableSize = 8;
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeySumExtGroupTest");
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(keyFields,
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- new MultiFieldsAggregatorFactory(keys,
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeySumExtGroupTest");
+ @Test
+ public void multiKeyAvgInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- spec.addRoot(printer);
- runTest(spec);
- }
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- @Test
- public void multiKeyAvgInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec, tableSize);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgInmemGroupTest");
+ @Test
+ public void multiKeyAvgPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- spec.addRoot(printer);
- runTest(spec);
- }
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- @Test
- public void multiKeyAvgPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ int[] keyFields = new int[] { 8, 0 };
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE},
+ desc);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldGroupAggregatorFactory(1, true) }),
+ outputRec);
- int[] keyFields = new int[] { 8, 0 };
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields, new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new CountFieldAggregatorFactory(true),
- new AvgFieldGroupAggregatorFactory(1, true) }),
- outputRec);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgInmemGroupTest");
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgInmemGroupTest");
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ @Test
+ public void multiKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- spec.addRoot(printer);
- runTest(spec);
- }
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- @Test
- public void multiKeyAvgExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ int[] keyFields = new int[] { 8, 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new AvgFieldMergeAggregatorFactory(4, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- int[] keyFields = new int[] { 8, 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new CountFieldAggregatorFactory(false),
- new AvgFieldGroupAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(new int[]{0, 1},
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new IntSumFieldAggregatorFactory(3, false),
- new AvgFieldMergeAggregatorFactory(4, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyAvgExtGroupTest");
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyAvgExtGroupTest");
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ @Test
+ public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- spec.addRoot(printer);
- runTest(spec);
- }
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- @Test
- public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ int[] keyFields = new int[] { 8, 0 };
+ int tableSize = 8;
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec, tableSize);
- int[] keyFields = new int[] { 8, 0 };
- int tableSize = 8;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec, tableSize);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringInmemGroupTest");
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringInmemGroupTest");
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ @Test
+ public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- spec.addRoot(printer);
- runTest(spec);
- }
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- @Test
- public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ int[] keyFields = new int[] { 8, 0 };
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec,
+ 4,
+ keyFields,
+ null,
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE},
+ desc);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
+ NC2_ID, NC1_ID);
- int[] keyFields = new int[] { 8, 0 };
+ IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE}));
+
+ spec.connect(conn0, csvScanner, 0, sorter, 0);
- PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
- spec, keyFields, new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new MinMaxStringFieldAggregatorFactory(15,
- true, false) }), outputRec);
+ PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
+ spec, keyFields, new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, false) }), outputRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, sorter, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringPreClusterGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringPreClusterGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
- @Test
- public void multiKeyMinMaxStringExtGroupTest() throws Exception {
- JobSpecification spec = new JobSpecification();
+ @Test
+ public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, NC2_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, NC2_ID);
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- int[] keyFields = new int[] { 8, 0 };
- int frameLimits = 4;
- int tableSize = 8;
+ int[] keyFields = new int[] { 8, 0 };
+ int frameLimits = 4;
+ int tableSize = 8;
- ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] {
- UTF8StringBinaryComparatorFactory.INSTANCE,
- UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15,
- true, true) }),
- new MultiFieldsAggregatorFactory(new int[] {0, 1},
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(2, false),
- new MinMaxStringFieldAggregatorFactory(3, true,
- true) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- tableSize), true);
+ ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] {
+ UTF8StringBinaryComparatorFactory.INSTANCE,
+ UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15,
+ true, true) }),
+ new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true,
+ true) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ tableSize), true);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
- spec, new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] {
- UTF8StringBinaryHashFunctionFactory.INSTANCE,
- UTF8StringBinaryHashFunctionFactory.INSTANCE }));
- spec.connect(conn1, csvScanner, 0, grouper, 0);
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+ spec, new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ UTF8StringBinaryHashFunctionFactory.INSTANCE,
+ UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "multiKeyMinMaxStringExtGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "multiKeyMinMaxStringExtGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC2_ID, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC2_ID, NC1_ID);
- IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
- spec.connect(conn2, grouper, 0, printer, 0);
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
+ spec.addRoot(printer);
+ runTest(spec);
+ }
}