Major changes:
- Based on Yingyi's review comments, the new interface is written to have the frame output logic out of the aggregators.
- Added methods for better management of the size of aggregation states. Now aggregator developer can decide the memory size to be used, and the groupers will assign the memory/frame spaces based on these information.
- Fixed some bugs in multiple-field aggregator.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@971 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 275bc56..9bb54db 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -24,6 +24,14 @@
private static final long serialVersionUID = 1L;
Object state = null;
+
+ public AggregateState(){
+ state = null;
+ }
+
+ public AggregateState(Object obj){
+ state = obj;
+ }
public void setState(Object obj) {
state = null;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 971af1a..a0a2dd2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -40,7 +40,6 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
@@ -275,13 +274,14 @@
}
int[] keyFieldsInPartialResults = new int[keyFields.length];
- for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
keyFieldsInPartialResults[i] = i;
}
-
+
final IAggregatorDescriptor aggregator = mergerFactory
.createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+ recordDescriptors[0], keyFields,
+ keyFieldsInPartialResults);
final AggregateState aggregateState = aggregator
.createAggregateStates();
@@ -303,6 +303,7 @@
* Output frame.
*/
private ByteBuffer outFrame, writerFrame;
+ private int outFrameOffset, writerFrameOffset;
private LinkedList<RunFileReader> runs;
@@ -315,11 +316,9 @@
private int[] currentFrameIndexInRun;
private int[] currentRunFrames;
- private final FrameTupleAppender outFrameAppender = new FrameTupleAppender(
- ctx.getFrameSize());
+
private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
ctx.getFrameSize(), recordDescriptors[0]);
- private FrameTupleAppender writerFrameAppender;
public void initialize() throws HyracksDataException {
aggState = (AggregateActivityState) ctx
@@ -340,7 +339,7 @@
runs = new LinkedList<RunFileReader>(runs);
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
- outFrameAppender.reset(outFrame, true);
+ outFrameOffset = 0;
outFrameAccessor.reset(outFrame);
while (runs.size() > 0) {
try {
@@ -427,7 +426,8 @@
runFileReaders, tupleAccessors,
topTuples);
} else {
- closeRun(runIndex, runFileReaders, tupleAccessors);
+ closeRun(runIndex, runFileReaders,
+ tupleAccessors);
break;
}
}
@@ -455,16 +455,32 @@
* Initialize the first output record Reset the
* tuple builder
*/
- if (!aggregator.init(outFrameAppender, fta,
- tupleIndex, aggregateState)) {
+
+ int stateLength = aggregator
+ .getBinaryAggregateStateLength(fta,
+ tupleIndex, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outFrame, stateLength, (outFrameOffset == 0))) {
flushOutFrame(writer, finalPass);
- if (!aggregator.init(outFrameAppender, fta,
- tupleIndex, aggregateState)) {
+ if (FrameToolsForGroupers
+ .isFrameOverflowing(outFrame,
+ stateLength, (outFrameOffset == 0))) {
throw new HyracksDataException(
- "Failed to append an aggregation result to the output frame.");
+ "The partial result is too large to be initialized in a frame.");
}
}
+ aggregator.init(outFrame.array(),
+ outFrameOffset, fta, tupleIndex,
+ aggregateState);
+
+ FrameToolsForGroupers
+ .updateFrameMetaForNewTuple(outFrame,
+ stateLength, (outFrameOffset == 0));
+
+ outFrameOffset += stateLength;
+
} else {
/**
* if new tuple is in the same group of the
@@ -482,12 +498,12 @@
runFileReaders, tupleAccessors, topTuples);
}
- if (outFrameAppender.getTupleCount() > 0) {
+ if (outFrameOffset > 0) {
flushOutFrame(writer, finalPass);
}
aggregator.close();
-
+
runs.subList(0, runNumber).clear();
/**
* insert the new run file into the beginning of the run
@@ -507,41 +523,66 @@
throws HyracksDataException {
if (writerFrame == null) {
writerFrame = ctx.allocateFrame();
+ writerFrameOffset = 0;
}
- if (writerFrameAppender == null) {
- writerFrameAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- writerFrameAppender.reset(writerFrame, true);
- }
+
outFrameAccessor.reset(outFrame);
for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
- if(isFinal){
- if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+
+ int outputLen;
+
+ if (isFinal) {
+
+ outputLen = aggregator.getFinalOutputLength(
+ outFrameAccessor, i, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ writerFrame, outputLen, (writerFrameOffset == 0))) {
FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ writerFrameOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ writerFrame, outputLen, (writerFrameOffset == 0))) {
throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
+ "Final aggregation output is too large to be fit into a frame.");
}
}
+
+ aggregator.outputFinalResult(writerFrame.array(),
+ writerFrameOffset, outFrameAccessor, i,
+ aggregateState);
+
} else {
- if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+
+ outputLen = aggregator.getPartialOutputLength(
+ outFrameAccessor, i, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ writerFrame, outputLen, (writerFrameOffset == 0))) {
FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ writerFrameOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ writerFrame, outputLen, (writerFrameOffset == 0))) {
throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
+ "Final aggregation output is too large to be fit into a frame.");
}
}
+
+ aggregator.outputPartialResult(writerFrame.array(),
+ writerFrameOffset, outFrameAccessor, i,
+ aggregateState);
}
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(
+ writerFrame, outputLen, (writerFrameOffset == 0));
+
+ writerFrameOffset += outputLen;
}
- if (writerFrameAppender.getTupleCount() > 0) {
+ if (writerFrameOffset > 0) {
FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
+ writerFrameOffset = 0;
}
- outFrameAppender.reset(outFrame, true);
+ outFrameOffset = 0;
}
private void setNextTopTuple(int runIndex, int[] tupleIndices,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
new file mode 100644
index 0000000..ee29c56
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+/**
+ *
+ */
+public class FrameToolsForGroupers {
+
+ public static void writeFields(byte[] buf, int offset, int length,
+ ArrayTupleBuilder tupleBuilder) throws HyracksDataException {
+ writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+ }
+
+ public static void writeFields(byte[] buf, int offset, int length,
+ int[] fieldsOffset, byte[] data, int dataOffset, int dataLength)
+ throws HyracksDataException {
+ if (dataLength + 4 * fieldsOffset.length > length) {
+ throw new HyracksDataException(
+ "Out of buffer bound: try to write too much data ("
+ + dataLength + ") to the given bound (" + length
+ + ").");
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
+ for (int i = 0; i < fieldsOffset.length; i++) {
+ buffer.putInt(fieldsOffset[i]);
+ }
+ buffer.put(data, dataOffset, dataLength);
+ }
+
+ public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
+ int addedTupleLength) throws HyracksDataException {
+ int currentTupleCount = buffer.getInt(FrameHelper
+ .getTupleCountOffset(buffer.capacity()));
+ int currentTupleEndOffset = buffer
+ .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * currentTupleCount);
+ int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
+
+ // update tuple end offset
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * (currentTupleCount + 1), newTupleEndOffset);
+ // Update the tuple count
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
+ currentTupleCount + 1);
+ }
+
+ public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
+ int addedTupleLength, boolean isReset) throws HyracksDataException {
+ int currentTupleCount;
+ int currentTupleEndOffset;
+ if (isReset) {
+ currentTupleCount = 0;
+ currentTupleEndOffset = 0;
+ } else {
+ currentTupleCount = buffer.getInt(FrameHelper
+ .getTupleCountOffset(buffer.capacity()));
+ currentTupleEndOffset = buffer.getInt(FrameHelper
+ .getTupleCountOffset(buffer.capacity())
+ - 4
+ * currentTupleCount);
+ }
+ int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
+
+ // update tuple end offset
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * (currentTupleCount + 1), newTupleEndOffset);
+ // Update the tuple count
+ buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
+ currentTupleCount + 1);
+ }
+
+ public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
+ throws HyracksDataException {
+
+ int currentTupleCount = buffer.getInt(FrameHelper
+ .getTupleCountOffset(buffer.capacity()));
+ if(currentTupleCount == 0 || isReset){
+ return length + 4 + 4 > buffer.capacity();
+ }
+ int currentTupleEndOffset = buffer
+ .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+ * currentTupleCount);
+ return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer
+ .capacity();
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index b1af426..853ae36 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -29,7 +29,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
class GroupingHashTable {
@@ -62,7 +61,7 @@
private static final int INIT_AGG_STATE_SIZE = 8;
private final IHyracksTaskContext ctx;
- private final FrameTupleAppender appender;
+
private final List<ByteBuffer> buffers;
private final Link[] table;
/**
@@ -79,6 +78,9 @@
private final ITuplePartitionComputer tpc;
private final IAggregatorDescriptor aggregator;
+ private int bufferOffset;
+ private int tupleCountInBuffer;
+
private final FrameTupleAccessor storedKeysAccessor;
GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
@@ -89,7 +91,7 @@
RecordDescriptor outRecordDescriptor, int tableSize)
throws HyracksDataException {
this.ctx = ctx;
- appender = new FrameTupleAppender(ctx.getFrameSize());
+
buffers = new ArrayList<ByteBuffer>();
table = new Link[tableSize];
@@ -109,12 +111,13 @@
tpc = tpcf.createPartitioner();
int[] keyFieldsInPartialResults = new int[fields.length];
- for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
keyFieldsInPartialResults[i] = i;
}
-
+
this.aggregator = aggregatorFactory.createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor, fields, keyFieldsInPartialResults);
+ inRecordDescriptor, outRecordDescriptor, fields,
+ keyFieldsInPartialResults);
this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
accumulatorSize = 0;
@@ -132,19 +135,11 @@
buffer.position(0);
buffer.limit(buffer.capacity());
buffers.add(buffer);
- appender.reset(buffer, true);
+ bufferOffset = 0;
+ tupleCountInBuffer = 0;
++lastBIndex;
}
- private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
- throws HyracksDataException {
- ByteBuffer frame = appender.getBuffer();
- frame.position(0);
- frame.limit(frame.capacity());
- writer.nextFrame(appender.getBuffer());
- appender.reset(appender.getBuffer(), true);
- }
-
void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
int entry = tpc.partition(accessor, tIndex, table.length);
Link link = table[entry];
@@ -167,22 +162,32 @@
saIndex = accumulatorSize++;
// Add keys
- // Add index to the keys in frame
- int sbIndex = lastBIndex;
- int stIndex = appender.getTupleCount();
-
// Add aggregation fields
AggregateState newState = aggregator.createAggregateStates();
-
- if(!aggregator.init(appender, accessor, tIndex, newState)){
+
+ int initLength = aggregator.getBinaryAggregateStateLength(accessor,
+ tIndex, newState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ buffers.get(lastBIndex), initLength, (bufferOffset == 0))) {
addNewBuffer();
- sbIndex = lastBIndex;
- stIndex = appender.getTupleCount();
- if(!aggregator.init(appender, accessor, tIndex, newState)){
- throw new IllegalStateException();
+ if (bufferOffset + initLength > ctx.getFrameSize()) {
+ throw new HyracksDataException(
+ "Cannot initialize the aggregation state within a frame.");
}
}
-
+
+ aggregator.init(buffers.get(lastBIndex).array(), bufferOffset,
+ accessor, tIndex, newState);
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(
+ buffers.get(lastBIndex), initLength, (bufferOffset == 0));
+
+ bufferOffset += initLength;
+
+ // Update tuple count in frame
+ tupleCountInBuffer++;
+
if (accumulatorSize >= aggregateStates.length) {
aggregateStates = Arrays.copyOf(aggregateStates,
aggregateStates.length * 2);
@@ -190,7 +195,8 @@
aggregateStates[saIndex] = newState;
- link.add(sbIndex, stIndex, saIndex);
+ link.add(lastBIndex, tupleCountInBuffer - 1, saIndex);
+
} else {
aggregator.aggregate(accessor, tIndex, null, 0,
aggregateStates[saIndex]);
@@ -199,7 +205,8 @@
void write(IFrameWriter writer) throws HyracksDataException {
ByteBuffer buffer = ctx.allocateFrame();
- appender.reset(buffer, true);
+ int bufOffset = 0;
+
for (int i = 0; i < table.length; ++i) {
Link link = table[i];
if (link != null) {
@@ -210,21 +217,42 @@
ByteBuffer keyBuffer = buffers.get(bIndex);
storedKeysAccessor.reset(keyBuffer);
- while (!aggregator
- .outputFinalResult(appender, storedKeysAccessor,
- tIndex, aggregateStates[aIndex])) {
- flushFrame(appender, writer);
+ int outputLen = aggregator
+ .getFinalOutputLength(storedKeysAccessor, tIndex,
+ aggregateStates[aIndex]);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(buffer,
+ outputLen, (bufOffset == 0))) {
+ writer.nextFrame(buffer);
+ bufOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(buffer,
+ outputLen, (bufOffset == 0))) {
+ throw new HyracksDataException(
+ "Cannot write aggregation output in a frame.");
+ }
}
+
+ aggregator
+ .outputFinalResult(buffer.array(), bufOffset,
+ storedKeysAccessor, tIndex,
+ aggregateStates[aIndex]);
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(buffer,
+ outputLen, (bufOffset == 0));
+
+ bufOffset += outputLen;
+
}
}
}
- if (appender.getTupleCount() != 0) {
- flushFrame(appender, writer);
+ if (bufOffset != 0) {
+ writer.nextFrame(buffer);
+ bufOffset = 0;
}
}
-
+
void close() throws HyracksDataException {
- for(AggregateState aState : aggregateStates){
+ for (AggregateState aState : aggregateStates) {
aState.close();
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index af68afb..5cff298 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -31,7 +31,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -116,19 +115,17 @@
keyFields, storedKeys, comparators);
final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
storedKeys, storedKeys, comparators);
- final FrameTupleAppender appender = new FrameTupleAppender(
- ctx.getFrameSize());
+
final ITuplePartitionComputer tpc = tpcf.createPartitioner();
- final ByteBuffer outFrame = ctx.allocateFrame();
final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
: firstKeyNormalizerFactory.createNormalizedKeyComputer();
int[] keyFieldsInPartialResults = new int[keyFields.length];
- for(int i = 0; i < keyFieldsInPartialResults.length; i++){
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
keyFieldsInPartialResults[i] = i;
}
-
+
final IAggregatorDescriptor aggregator = aggregateFactory
.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
keyFields, keyFieldsInPartialResults);
@@ -138,7 +135,12 @@
return new ISpillableTable() {
- private int dataFrameCount;
+ private int lastBufIndex;
+ private int outFrameOffset;
+ private int tupleCountInOutFrame;
+
+ private ByteBuffer outputFrame;
+
private final ISerializableTable table = new SerializableHashTable(
tableSize, ctx);
private final TuplePointer storedTuplePointer = new TuplePointer();
@@ -197,7 +199,7 @@
@Override
public void reset() {
- dataFrameCount = -1;
+ lastBufIndex = -1;
tPointers = null;
table.reset();
aggregator.reset();
@@ -206,7 +208,7 @@
@Override
public boolean insert(FrameTupleAccessor accessor, int tIndex)
throws HyracksDataException {
- if (dataFrameCount < 0)
+ if (lastBufIndex < 0)
nextAvailableFrame();
int entry = tpc.partition(accessor, tIndex, tableSize);
boolean foundGroup = false;
@@ -227,21 +229,27 @@
if (!foundGroup) {
- if (!aggregator.init(appender, accessor, tIndex,
- aggregateState)) {
+ int initLen = aggregator.getBinaryAggregateStateLength(
+ accessor, tIndex, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ frames.get(lastBufIndex), initLen, (outFrameOffset == 0))) {
if (!nextAvailableFrame()) {
return false;
- } else {
- if (!aggregator.init(appender, accessor, tIndex,
- aggregateState)) {
- throw new HyracksDataException(
- "Failed to init an aggregator");
- }
}
}
- storedTuplePointer.frameIndex = dataFrameCount;
- storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+ aggregator.init(frames.get(lastBufIndex).array(),
+ outFrameOffset, accessor, tIndex, aggregateState);
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(
+ frames.get(lastBufIndex), initLen, (outFrameOffset == 0));
+
+ outFrameOffset += initLen;
+ tupleCountInOutFrame++;
+
+ storedTuplePointer.frameIndex = lastBufIndex;
+ storedTuplePointer.tupleIndex = tupleCountInOutFrame - 1;
table.insert(entry, storedTuplePointer);
} else {
@@ -259,16 +267,19 @@
@Override
public int getFrameCount() {
- return dataFrameCount;
+ return lastBufIndex;
}
@Override
public void flushFrames(IFrameWriter writer, boolean isPartial)
throws HyracksDataException {
- FrameTupleAppender appender = new FrameTupleAppender(
- ctx.getFrameSize());
+ if (outputFrame == null) {
+ outputFrame = ctx.allocateFrame();
+ }
+
+ int outputFrameOffset = 0;
writer.open();
- appender.reset(outFrame, true);
+
if (tPointers == null) {
// Not sorted
for (int i = 0; i < tableSize; ++i) {
@@ -284,25 +295,64 @@
storedKeysAccessor1.reset(frames.get(bIndex));
+ int outputLen;
+
if (isPartial) {
- while (!aggregator.outputPartialResult(
- appender, storedKeysAccessor1, tIndex,
- aggregateState)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- }
- } else {
- while (!aggregator.outputFinalResult(appender,
+
+ outputLen = aggregator.getPartialOutputLength(
storedKeysAccessor1, tIndex,
- aggregateState)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
+ aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
+ if (FrameToolsForGroupers
+ .isFrameOverflowing(outputFrame,
+ outputLen, (outputFrameOffset == 0))) {
+ throw new HyracksDataException(
+ "The output item is too large to be fit into a frame.");
+ }
}
+
+ aggregator.outputPartialResult(
+ outputFrame.array(), outputFrameOffset,
+ storedKeysAccessor1, tIndex,
+ aggregateState);
+
+ } else {
+ outputLen = aggregator.getFinalOutputLength(
+ storedKeysAccessor1, tIndex,
+ aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
+ if (FrameToolsForGroupers
+ .isFrameOverflowing(outputFrame,
+ outputLen, (outputFrameOffset == 0))) {
+ throw new HyracksDataException(
+ "The output item is too large to be fit into a frame.");
+ }
+ }
+
+ aggregator.outputFinalResult(
+ outputFrame.array(), outputFrameOffset,
+ storedKeysAccessor1, tIndex,
+ aggregateState);
}
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(
+ outputFrame, outputLen, (outputFrameOffset == 0));
+
+ outputFrameOffset += outputLen;
+
} while (true);
}
- if (appender.getTupleCount() != 0) {
- FrameUtils.flushFrame(outFrame, writer);
+ if (outputFrameOffset > 0) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
}
aggregator.close();
return;
@@ -319,45 +369,65 @@
ByteBuffer buffer = frames.get(frameIndex);
storedKeysAccessor1.reset(buffer);
+ int outputLen;
+
if (isPartial) {
- if (!aggregator
- .outputPartialResult(appender,
- storedKeysAccessor1, tupleIndex,
- aggregateState)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!aggregator.outputPartialResult(appender,
- storedKeysAccessor1, tupleIndex,
- aggregateState)) {
+
+ outputLen = aggregator
+ .getPartialOutputLength(storedKeysAccessor1,
+ tupleIndex, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
throw new HyracksDataException(
- "Failed to output partial result.");
+ "The output item is too large to be fit into a frame.");
}
}
+
+ aggregator.outputPartialResult(outputFrame.array(),
+ outputFrameOffset, storedKeysAccessor1,
+ tupleIndex, aggregateState);
+
} else {
- if (!aggregator
- .outputFinalResult(appender,
- storedKeysAccessor1, tupleIndex,
- aggregateState)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!aggregator.outputFinalResult(appender,
- storedKeysAccessor1, tupleIndex,
- aggregateState)) {
+ outputLen = aggregator
+ .getFinalOutputLength(storedKeysAccessor1,
+ tupleIndex, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(
+ outputFrame, outputLen, (outputFrameOffset == 0))) {
throw new HyracksDataException(
- "Failed to output partial result.");
+ "The output item is too large to be fit into a frame.");
}
}
+
+ aggregator.outputFinalResult(outputFrame.array(),
+ outputFrameOffset, storedKeysAccessor1,
+ tupleIndex, aggregateState);
}
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(
+ outputFrame, outputLen, (outputFrameOffset == 0));
+
+ outputFrameOffset += outputLen;
}
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
+ if (outputFrameOffset > 0) {
+ FrameUtils.flushFrame(outputFrame, writer);
+ outputFrameOffset = 0;
}
aggregator.close();
}
@Override
public void close() {
- dataFrameCount = -1;
+ lastBufIndex = -1;
tPointers = null;
table.close();
frames.clear();
@@ -374,7 +444,7 @@
*/
private boolean nextAvailableFrame() {
// Return false if the number of frames is equal to the limit.
- if (dataFrameCount + 1 >= framesLimit)
+ if (lastBufIndex + 1 >= framesLimit)
return false;
if (frames.size() < framesLimit) {
@@ -383,15 +453,17 @@
frame.position(0);
frame.limit(frame.capacity());
frames.add(frame);
- appender.reset(frame, true);
- dataFrameCount = frames.size() - 1;
+ outFrameOffset = 0;
+ tupleCountInOutFrame = 0;
+ lastBufIndex = frames.size() - 1;
} else {
// Reuse an old frame
- dataFrameCount++;
- ByteBuffer frame = frames.get(dataFrameCount);
+ lastBufIndex++;
+ ByteBuffer frame = frames.get(lastBufIndex);
frame.position(0);
frame.limit(frame.capacity());
- appender.reset(frame, true);
+ outFrameOffset = 0;
+ tupleCountInOutFrame = 0;
}
return true;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
deleted file mode 100644
index 858c760..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregateStateFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group;
-
-import java.io.Serializable;
-
-/**
- *
- */
-public interface IAggregateStateFactory extends Serializable {
-
- /**
- * Get the (partial) state length in binary.
- *
- * @return
- */
- public int getStateLength();
-
- public Object createState();
-
- public boolean hasBinaryState();
-
- public boolean hasObjectState();
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index f3cf2e7..358bf42 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -16,7 +16,6 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
/**
*
@@ -29,13 +28,20 @@
* @return
*/
public AggregateState createAggregateStates();
-
+
/**
* Get the length of the binary states.
*
* @return
*/
- public int getAggregateStatesLength();
+ public int getBinaryAggregateStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException;
+
+ public int getPartialOutputLength(IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException;
+
+ public int getFinalOutputLength(IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException;
/**
* Initialize the state based on the input tuple.
@@ -49,9 +55,8 @@
* The state to be initialized.
* @throws HyracksDataException
*/
- public boolean init(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex, AggregateState state)
- throws HyracksDataException;
+ public void init(byte[] buf, int offset, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
@@ -93,7 +98,7 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public boolean outputPartialResult(FrameTupleAppender appender,
+ public void outputPartialResult(byte[] data, int offset,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
@@ -109,7 +114,7 @@
* The aggregation state.
* @throws HyracksDataException
*/
- public boolean outputFinalResult(FrameTupleAppender appender,
+ public void outputFinalResult(byte[] data, int offset,
IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 3cf3d34..1471318 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -23,22 +23,20 @@
*
*/
public interface IFieldAggregateDescriptor {
-
- public IAggregateStateFactory getAggregateStateFactory();
/**
- * Initialize the state based on the input tuple.
+ * Initialize the state based on the input tuple.
*
* @param accessor
* @param tIndex
* @param fieldOutput
* The data output for the frame containing the state. This may
- * be null, if the state is maintained as a java object.
- *
- * Note that we have an assumption that the initialization of
- * the binary state (if any) inserts the state fields into the
- * buffer in a appending fashion. This means that an arbitrary
- * initial size of the state can be accquired.
+ * be null, if the state is maintained as a java object.
+ *
+ * Note that we have an assumption that the initialization of the
+ * binary state (if any) inserts the state fields into the buffer
+ * in a appending fashion. This means that an arbitrary initial
+ * size of the state can be accquired.
* @param state
* The state to be initialized.
* @throws HyracksDataException
@@ -65,11 +63,12 @@
* @param data
* The buffer containing the state, if frame-based-state is used.
* This means that it can be null if java-object-based-state is
- * used.
- *
+ * used.
+ *
* Here the length of binary state can be obtains from the state
* parameter, and if the content to be filled into that is over-
- * flowing (larger than the reversed space), error should be emit.
+ * flowing (larger than the reversed space), error should be
+ * emit.
* @param offset
* @param state
* The aggregate state.
@@ -109,6 +108,24 @@
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
int offset, AggregateState state) throws HyracksDataException;
+ public boolean needsBinaryState();
+
+ public boolean needsObjectState();
+
+ public int getBinaryStateLength(IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException;
+
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException;
+
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException;
+
+ public AggregateState createState();
+
+ /**
+ * Close the field aggregator
+ */
public void close();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 3f2efaf..17f62f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class PreclusteredGroupWriter implements IFrameWriter {
@@ -34,24 +33,29 @@
private final ByteBuffer copyFrame;
private final FrameTupleAccessor inFrameAccessor;
private final FrameTupleAccessor copyFrameAccessor;
+
private final ByteBuffer outFrame;
- private final FrameTupleAppender appender;
+ private int outFrameOffset;
+
private boolean first;
- public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
- IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+ public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
+ IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
+ RecordDescriptor inRecordDesc, IFrameWriter writer) {
this.groupFields = groupFields;
this.comparators = comparators;
this.aggregator = aggregator;
this.aggregateState = aggregator.createAggregateStates();
this.writer = writer;
copyFrame = ctx.allocateFrame();
- inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
- copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ inRecordDesc);
copyFrameAccessor.reset(copyFrame);
+
outFrame = ctx.allocateFrame();
- appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(outFrame, true);
+ outFrameOffset = 0;
}
@Override
@@ -66,49 +70,78 @@
int nTuples = inFrameAccessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
if (first) {
- aggregator.init(null, inFrameAccessor, i, aggregateState);
+
+ aggregator.init(null, 0, inFrameAccessor, i, aggregateState);
+
first = false;
+
} else {
if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ switchGroupIfRequired(copyFrameAccessor,
+ copyFrameAccessor.getTupleCount() - 1,
+ inFrameAccessor, i);
} else {
- switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ switchGroupIfRequired(inFrameAccessor, i - 1,
+ inFrameAccessor, i);
}
-
+
}
}
FrameUtils.copy(buffer, copyFrame);
}
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
- FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor,
+ int prevTupleIndex, FrameTupleAccessor currTupleAccessor,
+ int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor,
+ currTupleIndex)) {
writeOutput(prevTupleAccessor, prevTupleIndex);
- aggregator.init(null, currTupleAccessor, currTupleIndex, aggregateState);
+
+ aggregator.init(null, 0, currTupleAccessor, currTupleIndex,
+ aggregateState);
} else {
- aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
+ aggregateState);
}
}
- private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
- throws HyracksDataException {
- if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- appender.reset(appender.getBuffer(), true);
- if (!aggregator.outputFinalResult(appender, lastTupleAccessor, lastTupleIndex, aggregateState)) {
- throw new IllegalStateException();
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
+ int lastTupleIndex) throws HyracksDataException {
+
+ int outLen = aggregator.getFinalOutputLength(lastTupleAccessor,
+ lastTupleIndex, aggregateState);
+
+ if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameOffset = 0;
+ if (FrameToolsForGroupers.isFrameOverflowing(outFrame, outLen, (outFrameOffset == 0))) {
+ throw new HyracksDataException(
+ "The output cannot be fit into a frame.");
}
}
+
+ aggregator.outputFinalResult(outFrame.array(), outFrameOffset,
+ lastTupleAccessor, lastTupleIndex, aggregateState);
+
+ FrameToolsForGroupers.updateFrameMetaForNewTuple(outFrame, outLen,
+ (outFrameOffset == 0));
+
+ outFrameOffset += outLen;
+
}
- private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
+ FrameTupleAccessor a2, int t2Idx) {
for (int i = 0; i < comparators.length; ++i) {
int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
+ + a1.getFieldStartOffset(t1Idx, fIdx);
int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
+ + a2.getFieldStartOffset(t2Idx, fIdx);
int l2 = a2.getFieldLength(t2Idx, fIdx);
- if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2
+ .getBuffer().array(), s2, l2) != 0) {
return false;
}
}
@@ -123,9 +156,10 @@
@Override
public void close() throws HyracksDataException {
if (!first) {
- writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
+ writeOutput(copyFrameAccessor,
+ copyFrameAccessor.getTupleCount() - 1);
+ if (outFrameOffset > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
}
}
aggregateState.close();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 32a7f49..f68f683 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -158,32 +157,43 @@
}
@Override
- public IAggregateStateFactory getAggregateStateFactory() {
- return new IAggregateStateFactory() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean hasObjectState() {
- return useObjectState;
- }
-
- @Override
- public boolean hasBinaryState() {
- return !useObjectState;
- }
-
- @Override
- public int getStateLength() {
- return 8;
- }
-
- @Override
- public Object createState() {
- return new Integer[]{0, 0};
- }
- };
+ public boolean needsObjectState() {
+ return useObjectState;
}
+
+ @Override
+ public boolean needsBinaryState() {
+ return !useObjectState;
+ }
+
+ @Override
+ public AggregateState createState() {
+ return new AggregateState(new Integer[]{0, 0});
+ }
+
+ @Override
+ public int getBinaryStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state)
+ throws HyracksDataException {
+ if(useObjectState){
+ return 0;
+ } else {
+ return 8;
+ }
+ }
+
+ @Override
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ return 8;
+ }
+
+ @Override
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ return 4;
+ }
+
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index 8f43dec..f463119 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -133,34 +132,44 @@
}
@Override
- public IAggregateStateFactory getAggregateStateFactory() {
- return new IAggregateStateFactory() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean hasObjectState() {
- return useObjectState;
- }
-
- @Override
- public boolean hasBinaryState() {
- return !useObjectState;
- }
-
- @Override
- public int getStateLength() {
- return 8;
- }
-
- @Override
- public Object createState() {
- return new Integer[]{0, 0};
- }
- };
+ public boolean needsObjectState() {
+ return useObjectState;
+ }
+
+ @Override
+ public boolean needsBinaryState() {
+ return !useObjectState;
}
@Override
+ public int getBinaryStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state)
+ throws HyracksDataException {
+ if(useObjectState){
+ return 0;
+ } else {
+ return 8;
+ }
+ }
+
+ @Override
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ return 8;
+ }
+
+ @Override
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ return 4;
+ }
+
+ @Override
+ public AggregateState createState() {
+ return new AggregateState(new Integer[]{0, 0});
+ }
+
+ @Override
public void init(IFrameTupleAccessor accessor,
int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 430284a..a4214a7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -35,29 +34,36 @@
IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
-
+
private final boolean useObjectState;
-
- public CountFieldAggregatorFactory(boolean useObjectState){
+
+ public CountFieldAggregatorFactory(boolean useObjectState) {
this.useObjectState = useObjectState;
}
- /* (non-Javadoc)
- * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+ * IFieldAggregateDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
-
+
@Override
public void reset() {
}
-
+
@Override
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputPartialResult(DataOutput fieldOutput,
+ byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
int count;
if (!useObjectState) {
count = IntegerSerializerDeserializer.getInt(data, offset);
@@ -71,10 +77,11 @@
"I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ int offset, AggregateState state)
+ throws HyracksDataException {
int count;
if (!useObjectState) {
count = IntegerSerializerDeserializer.getInt(data, offset);
@@ -88,7 +95,7 @@
"I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
public void init(IFrameTupleAccessor accessor, int tIndex,
DataOutput fieldOutput, AggregateState state)
@@ -105,40 +112,24 @@
state.setState(count);
}
}
-
- @Override
- public IAggregateStateFactory getAggregateStateFactory() {
- return new IAggregateStateFactory() {
-
- private static final long serialVersionUID = 1L;
- @Override
- public boolean hasObjectState() {
- return useObjectState;
- }
-
- @Override
- public boolean hasBinaryState() {
- return !useObjectState;
- }
-
- @Override
- public int getStateLength() {
- return 4;
- }
-
- @Override
- public Object createState() {
- return new Integer(0);
- }
- };
+ public boolean needsObjectState() {
+ return useObjectState;
}
-
+
+ public boolean needsBinaryState() {
+ return !useObjectState;
+ }
+
+ public AggregateState createState() {
+ return new AggregateState(new Integer(0));
+ }
+
@Override
public void close() {
-
+
}
-
+
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
byte[] data, int offset, AggregateState state)
@@ -153,6 +144,26 @@
state.setState(count);
}
}
+
+ @Override
+ public int getBinaryStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) {
+ if(useObjectState)
+ return 0;
+ return 4;
+ }
+
+ @Override
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) {
+ return 4;
+ }
+
+ @Override
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) {
+ return 4;
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 9c5062f..43de9c6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -37,7 +36,7 @@
private static final long serialVersionUID = 1L;
private final int aggField;
-
+
private final boolean useObjectState;
public IntSumFieldAggregatorFactory(int aggField, boolean useObjState) {
@@ -109,12 +108,12 @@
int sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-
+
sum += IntegerSerializerDeserializer.getInt(accessor
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
-
+
if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
@@ -127,32 +126,16 @@
}
}
- @Override
- public IAggregateStateFactory getAggregateStateFactory(){
- return new IAggregateStateFactory() {
-
- private static final long serialVersionUID = 1L;
+ public boolean needsObjectState() {
+ return useObjectState;
+ }
- @Override
- public boolean hasObjectState() {
- return useObjectState;
- }
-
- @Override
- public boolean hasBinaryState() {
- return !useObjectState;
- }
-
- @Override
- public int getStateLength() {
- return 4;
- }
-
- @Override
- public Object createState() {
- return new Integer(0);
- }
- };
+ public boolean needsBinaryState() {
+ return !useObjectState;
+ }
+
+ public AggregateState createState() {
+ return new AggregateState(new Integer(0));
}
@Override
@@ -171,7 +154,7 @@
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
-
+
if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
sum += buf.getInt(offset);
@@ -181,6 +164,26 @@
state.setState(sum);
}
}
+
+ @Override
+ public int getBinaryStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) {
+ if (useObjectState)
+ return 0;
+ return 4;
+ }
+
+ @Override
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) {
+ return 4;
+ }
+
+ @Override
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) {
+ return 4;
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 9152093..35aa967 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -204,34 +203,92 @@
}
}
- @Override
- public IAggregateStateFactory getAggregateStateFactory() {
- return new IAggregateStateFactory() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean hasObjectState() {
- return true;
- }
-
- @Override
- public boolean hasBinaryState() {
- return hasBinaryState;
- }
-
- @Override
- public int getStateLength() {
- return 4;
- }
-
- @Override
- public Object createState() {
- return null;
- }
- };
+ public boolean needsObjectState() {
+ return true;
}
+
+ public boolean needsBinaryState() {
+ return hasBinaryState;
+ }
+
+ public AggregateState createState() {
+ return new AggregateState();
+ }
+
+ @Override
+ public int getBinaryStateLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) {
+ int len = 0;
+
+ if (hasBinaryState) {
+ len = 4;
+ }
+
+ return len;
+ }
+
+ @Override
+ public int getPartialResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ int len = 0;
+
+ if (hasBinaryState) {
+ int stateIdx = IntegerSerializerDeserializer.getInt(data,
+ offset);
+
+ Object[] storedState = (Object[]) state.getState();
+
+ len = getUTFLength(((String) (storedState[stateIdx])));
+ } else {
+ len = getUTFLength((String) (state.getState()));
+ }
+
+ return len;
+ }
+
+ @Override
+ public int getFinalResultLength(byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
+ int len = 0;
+
+ if (hasBinaryState) {
+ int stateIdx = IntegerSerializerDeserializer.getInt(data,
+ offset);
+
+ Object[] storedState = (Object[]) state.getState();
+
+ len = getUTFLength(((String) (storedState[stateIdx])));
+ } else {
+ len = getUTFLength((String) (state.getState()));
+ }
+
+ return len;
+ }
+
+ private int getUTFLength(String str) throws HyracksDataException {
+ int utflen = 0;
+ int c = 0;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < str.length(); i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535) {
+ throw new HyracksDataException("encoded string too long: "
+ + utflen + " bytes");
+ }
+
+ return utflen + 2;
+ }
+
};
}
-
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 0a002da..27ad066 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,9 +21,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.FrameToolsForGroupers;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
@@ -37,7 +36,14 @@
private static final long serialVersionUID = 1L;
private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+ private int[] keys;
+ public MultiFieldsAggregatorFactory(int[] keys,
+ IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+ this.keys = keys;
+ this.aggregatorFactories = aggregatorFactories;
+ }
+
public MultiFieldsAggregatorFactory(
IFieldAggregateDescriptorFactory[] aggregatorFactories) {
this.aggregatorFactories = aggregatorFactories;
@@ -55,21 +61,22 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
- throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, final int[] keyFields,
+ final int[] keyFieldsInPartialResults) throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
- final IAggregateStateFactory[] aggregateStateFactories = new IAggregateStateFactory[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
inRecordDescriptor, outRecordDescriptor);
- aggregateStateFactories[i] = aggregators[i]
- .getAggregateStateFactory();
}
-
- int stateTupleFieldCount = keyFields.length;
- for (int i = 0; i < aggregateStateFactories.length; i++) {
- if (aggregateStateFactories[i].hasBinaryState()) {
+
+ if(this.keys == null){
+ this.keys = keyFields;
+ }
+
+ int stateTupleFieldCount = keys.length;
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregators[i].needsBinaryState()) {
stateTupleFieldCount++;
}
}
@@ -82,155 +89,204 @@
return new IAggregatorDescriptor() {
- private boolean initPending, outputPending;
-
@Override
public void reset() {
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].reset();
- aggregateStateFactories[i] = aggregators[i]
- .getAggregateStateFactory();
}
- initPending = false;
- outputPending = false;
}
@Override
- public boolean outputPartialResult(FrameTupleAppender appender,
+ public void outputPartialResult(byte[] buf, int offset,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- if (!outputPending) {
- resultTupleBuilder.reset();
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- resultTupleBuilder.addField(accessor, tIndex,
- keyFieldsInPartialResults[i]);
- }
- DataOutput dos = resultTupleBuilder.getDataOutput();
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- for (int i = 0; i < aggregators.length; i++) {
+ resultTupleBuilder.reset();
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ resultTupleBuilder.addField(accessor, tIndex,
+ keyFieldsInPartialResults[i]);
+ }
+ DataOutput dos = resultTupleBuilder.getDataOutput();
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ for (int i = 0; i < aggregators.length; i++) {
+ int fieldOffset = accessor.getFieldStartOffset(tIndex,
+ keys.length + i);
+ aggregators[i].outputPartialResult(dos, accessor
+ .getBuffer().array(),
+ fieldOffset + accessor.getFieldSlotsLength()
+ + tupleOffset, ((AggregateState[]) state
+ .getState())[i]);
+ resultTupleBuilder.addFieldEndOffset();
+ }
+
+ if (buf != null)
+ FrameToolsForGroupers.writeFields(buf, offset, this
+ .getPartialOutputLength(accessor, tIndex, state),
+ resultTupleBuilder);
+
+ }
+
+ @Override
+ public void outputFinalResult(byte[] buf, int offset,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+
+ resultTupleBuilder.reset();
+
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ resultTupleBuilder.addField(accessor, tIndex,
+ keyFieldsInPartialResults[i]);
+ }
+
+ DataOutput dos = resultTupleBuilder.getDataOutput();
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregators[i].needsBinaryState()) {
int fieldOffset = accessor.getFieldStartOffset(tIndex,
- keyFields.length + i);
- aggregators[i].outputPartialResult(dos, accessor
+ keys.length + i);
+ aggregators[i].outputFinalResult(dos, accessor
.getBuffer().array(),
- fieldOffset + accessor.getFieldSlotsLength()
- + tupleOffset,
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldOffset,
((AggregateState[]) state.getState())[i]);
- resultTupleBuilder.addFieldEndOffset();
+ } else {
+ aggregators[i].outputFinalResult(dos, null, 0,
+ ((AggregateState[]) state.getState())[i]);
}
+ resultTupleBuilder.addFieldEndOffset();
}
- if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
- resultTupleBuilder.getByteArray(), 0,
- resultTupleBuilder.getSize())) {
- outputPending = true;
- return false;
- }
- outputPending = false;
- return true;
+ if (buf != null)
+ FrameToolsForGroupers.writeFields(buf, offset,
+ this.getFinalOutputLength(accessor, tIndex, state),
+ resultTupleBuilder);
}
@Override
- public boolean outputFinalResult(FrameTupleAppender appender,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- if (!outputPending) {
- resultTupleBuilder.reset();
-
- for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
- resultTupleBuilder.addField(accessor, tIndex,
- keyFieldsInPartialResults[i]);
- }
-
- DataOutput dos = resultTupleBuilder.getDataOutput();
+ public int getPartialOutputLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int stateLength = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- for (int i = 0; i < aggregators.length; i++) {
- if (aggregateStateFactories[i].hasBinaryState()) {
- int fieldOffset = accessor.getFieldStartOffset(
- tIndex, keyFields.length + i);
- aggregators[i].outputFinalResult(dos, accessor
- .getBuffer().array(), tupleOffset
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ stateLength += accessor.getFieldLength(tIndex,
+ keyFieldsInPartialResults[i]);
+ // add length for slot offset
+ stateLength += 4;
+ }
+
+ for (int i = 0; i < aggregators.length; i++) {
+ int fieldOffset = 0;
+ if (aggregators[i].needsBinaryState()) {
+ fieldOffset = accessor.getFieldStartOffset(tIndex,
+ keys.length + i);
+ }
+ stateLength += aggregators[i].getPartialResultLength(
+ accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength()
+ + fieldOffset,
+ ((AggregateState[]) state.getState())[i]);
+ // add length for slot offset
+ stateLength += 4;
+ }
+ return stateLength;
+ }
+
+ @Override
+ public int getFinalOutputLength(IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int stateLength = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+
+ for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+ stateLength += accessor.getFieldLength(tIndex,
+ keyFieldsInPartialResults[i]);
+ // add length for slot offset
+ stateLength += 4;
+ }
+
+ for (int i = 0; i < aggregators.length; i++) {
+ int fieldOffset = 0;
+ if (aggregators[i].needsBinaryState()) {
+ fieldOffset = accessor.getFieldStartOffset(tIndex,
+ keys.length + i);
+ }
+ stateLength += aggregators[i].getFinalResultLength(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ fieldOffset, ((AggregateState[]) state
.getState())[i]);
- } else {
- aggregators[i].outputFinalResult(dos, null, 0,
- ((AggregateState[]) state.getState())[i]);
- }
- resultTupleBuilder.addFieldEndOffset();
- }
+ // add length for slot offset
+ stateLength += 4;
}
- if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
- resultTupleBuilder.getByteArray(), 0,
- resultTupleBuilder.getSize())) {
- outputPending = true;
- return false;
- }
- outputPending = false;
- return true;
+ return stateLength;
}
@Override
- public boolean init(FrameTupleAppender appender,
+ public void init(byte[] buf, int offset,
IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- if (!initPending) {
- stateTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
- stateTupleBuilder.addField(accessor, tIndex,
- keyFields[i]);
- }
- DataOutput dos = stateTupleBuilder.getDataOutput();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].init(accessor, tIndex, dos,
- ((AggregateState[]) state.getState())[i]);
- if (aggregateStateFactories[i].hasBinaryState()) {
- stateTupleBuilder.addFieldEndOffset();
- }
+ stateTupleBuilder.reset();
+ for (int i = 0; i < keys.length; i++) {
+ stateTupleBuilder.addField(accessor, tIndex, keys[i]);
+ }
+ DataOutput dos = stateTupleBuilder.getDataOutput();
+
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].init(accessor, tIndex, dos,
+ ((AggregateState[]) state.getState())[i]);
+ if (aggregators[i].needsBinaryState()) {
+ stateTupleBuilder.addFieldEndOffset();
}
}
- // For pre-cluster: no output state is needed
- if(appender == null){
- initPending = false;
- return true;
- }
- if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
- stateTupleBuilder.getByteArray(), 0,
- stateTupleBuilder.getSize())) {
- initPending = true;
- return false;
- }
- initPending = false;
- return true;
+ if (buf != null)
+ FrameToolsForGroupers.writeFields(buf, offset, this
+ .getBinaryAggregateStateLength(accessor, tIndex,
+ state), stateTupleBuilder);
}
@Override
public AggregateState createAggregateStates() {
- AggregateState aggregateStates = new AggregateState();
- AggregateState[] states = new AggregateState[aggregateStateFactories.length];
+ AggregateState[] states = new AggregateState[aggregators.length];
for (int i = 0; i < states.length; i++) {
- states[i] = new AggregateState();
- states[i]
- .setState(aggregateStateFactories[i].createState());
+ states[i] = aggregators[i].createState();
}
- aggregateStates.setState(states);
- return aggregateStates;
+ return new AggregateState(states);
}
@Override
- public int getAggregateStatesLength() {
+ public int getBinaryAggregateStateLength(
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
int stateLength = 0;
- for (int i = 0; i < aggregateStateFactories.length; i++) {
- stateLength += aggregateStateFactories[i].getStateLength();
+
+ for (int i = 0; i < keys.length; i++) {
+ stateLength += accessor
+ .getFieldLength(tIndex, keys[i]);
+ // add length for slot offset
+ stateLength += 4;
+ }
+
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregators[i].needsBinaryState()) {
+ stateLength += aggregators[i].getBinaryStateLength(
+ accessor, tIndex,
+ ((AggregateState[]) state.getState())[i]);
+ // add length for slot offset
+ stateLength += 4;
+ }
}
return stateLength;
}
@Override
public void close() {
- for(int i = 0; i < aggregators.length; i++){
+ for (int i = 0; i < aggregators.length; i++) {
aggregators[i].close();
}
}
@@ -244,10 +300,10 @@
.getTupleStartOffset(stateTupleIndex);
int fieldIndex = 0;
for (int i = 0; i < aggregators.length; i++) {
- if (aggregateStateFactories[i].hasBinaryState()) {
+ if (aggregators[i].needsBinaryState()) {
int stateFieldOffset = stateAccessor
.getFieldStartOffset(stateTupleIndex,
- keyFields.length + fieldIndex);
+ keys.length + fieldIndex);
aggregators[i].aggregate(
accessor,
tIndex,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index f171239..717cea5 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -67,7 +67,7 @@
final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
- "data/tpch0.001/lineitem.tbl"))) });
+ "data/tpch0.001/lineitem.tbl"))) });
final RecordDescriptor desc = new RecordDescriptor(
new ISerializerDeserializer[] {
@@ -765,6 +765,7 @@
IntegerSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 8, 0 };
+ int[] keys = new int[] {0, 1};
int frameLimits = 4;
int tableSize = 8;
@@ -776,11 +777,11 @@
UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
+ new MultiFieldsAggregatorFactory(keyFields,
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(3, false) }),
- new MultiFieldsAggregatorFactory(
+ new MultiFieldsAggregatorFactory(keys,
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(2, false),
new IntSumFieldAggregatorFactory(3, false) }),
@@ -966,8 +967,8 @@
new IntSumFieldAggregatorFactory(1, false),
new CountFieldAggregatorFactory(false),
new AvgFieldGroupAggregatorFactory(1, false) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
+ new MultiFieldsAggregatorFactory(new int[]{0, 1},
+ new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(2, false),
new IntSumFieldAggregatorFactory(3, false),
new AvgFieldMergeAggregatorFactory(4, false) }),
@@ -1148,7 +1149,7 @@
new IntSumFieldAggregatorFactory(1, false),
new MinMaxStringFieldAggregatorFactory(15,
true, true) }),
- new MultiFieldsAggregatorFactory(
+ new MultiFieldsAggregatorFactory(new int[] {0, 1},
new IFieldAggregateDescriptorFactory[] {
new IntSumFieldAggregatorFactory(2, false),
new MinMaxStringFieldAggregatorFactory(3, true,