Update issue #52:
- rewrote the aggregator interface to create a state factory;
- added a wrapper interface for aggregation and changed the original aggregators to be field aggregators, and added a multi-field aggregator wrapper;
- rewrote test cases for new interface;
- added count field aggregator.
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@879 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
index f9c3b90..f273c37 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/AggregateState.java
@@ -37,9 +37,5 @@
public Object getState() {
return state;
}
-
- public int getLength() {
- return -1;
- }
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
index ba538f7..680e7cb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ExternalGroupOperatorDescriptor.java
@@ -67,8 +67,9 @@
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory firstNormalizerFactory;
- private final IFieldAggregateDescriptorFactory[] aggregateFactories;
- private final IFieldAggregateDescriptorFactory[] mergeFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final IAggregatorDescriptorFactory mergerFactory;
+
private final int framesLimit;
private final ISpillableTableFactory spillableTableFactory;
private final boolean isOutputSorted;
@@ -77,8 +78,8 @@
int[] keyFields, int framesLimit,
IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstNormalizerFactory,
- IFieldAggregateDescriptorFactory[] aggregateFactories,
- IFieldAggregateDescriptorFactory[] mergeFactories,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory,
RecordDescriptor recordDescriptor,
ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
super(spec, 1, 1);
@@ -92,8 +93,8 @@
"frame limit should at least be 2, but it is "
+ framesLimit + "!");
}
- this.aggregateFactories = aggregateFactories;
- this.mergeFactories = mergeFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.mergerFactory = mergerFactory;
this.keyFields = keyFields;
this.comparatorFactories = comparatorFactories;
this.firstNormalizerFactory = firstNormalizerFactory;
@@ -180,7 +181,7 @@
state.runs = new LinkedList<RunFileReader>();
state.gTable = spillableTableFactory.buildSpillableTable(
ctx, keyFields, comparatorFactories,
- firstNormalizerFactory, aggregateFactories,
+ firstNormalizerFactory, aggregatorFactory,
recordDescProvider.getInputRecordDescriptor(
getOperatorId(), 0), recordDescriptors[0],
ExternalGroupOperatorDescriptor.this.framesLimit);
@@ -273,14 +274,13 @@
comparators[i] = comparatorFactories[i]
.createBinaryComparator();
}
- final IFieldAggregateDescriptor[] currentWorkingAggregators = new IFieldAggregateDescriptor[mergeFactories.length];
- final AggregateState[] aggregateStates = new AggregateState[mergeFactories.length];
- for (int i = 0; i < currentWorkingAggregators.length; i++) {
- currentWorkingAggregators[i] = mergeFactories[i]
- .createAggregator(ctx, recordDescriptors[0],
- recordDescriptors[0]);
- aggregateStates[i] = currentWorkingAggregators[i].createState();
- }
+
+ final IAggregatorDescriptor aggregator = mergerFactory
+ .createAggregator(ctx, recordDescriptors[0],
+ recordDescriptors[0], keyFields);
+ final AggregateState aggregateState = aggregator
+ .createAggregateStates();
+
final int[] storedKeys = new int[keyFields.length];
/**
* Get the list of the fields in the stored records.
@@ -288,11 +288,6 @@
for (int i = 0; i < keyFields.length; ++i) {
storedKeys[i] = i;
}
- /**
- * Tuple builder
- */
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
- recordDescriptors[0].getFields().length);
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
/**
@@ -455,55 +450,28 @@
* Initialize the first output record Reset the
* tuple builder
*/
- tupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
- tupleBuilder.addField(fta, tupleIndex, i);
- }
- for (int i = 0; i < currentWorkingAggregators.length; i++) {
- currentWorkingAggregators[i].init(fta,
- tupleIndex,
- tupleBuilder.getDataOutput(),
- aggregateStates[i]);
- tupleBuilder.addFieldEndOffset();
- }
- if (!outFrameAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
+
+ if (!aggregator.init(outFrameAppender, fta,
+ tupleIndex, aggregateState)) {
flushOutFrame(writer, finalPass);
- if (!outFrameAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize()))
+ if (!aggregator.init(outFrameAppender, fta,
+ tupleIndex, aggregateState)) {
throw new HyracksDataException(
"Failed to append an aggregation result to the output frame.");
+ }
}
+
} else {
/**
* if new tuple is in the same group of the
* current aggregator do merge and output to the
* outFrame
*/
- int tupleOffset = outFrameAccessor
- .getTupleStartOffset(currentTupleInOutFrame);
- int fieldOffset = outFrameAccessor
- .getFieldStartOffset(
- currentTupleInOutFrame,
- keyFields.length);
- for (int i = 0; i < currentWorkingAggregators.length; i++) {
- currentWorkingAggregators[i]
- .aggregate(
- fta,
- tupleIndex,
- outFrameAccessor
- .getBuffer()
- .array(),
- tupleOffset
- + outFrameAccessor
- .getFieldSlotsLength()
- + fieldOffset,
- aggregateStates[i]);
- }
+
+ aggregator.aggregate(fta, tupleIndex,
+ outFrameAccessor,
+ currentTupleInOutFrame, aggregateState);
+
}
tupleIndices[runIndex]++;
setNextTopTuple(runIndex, tupleIndices,
@@ -514,9 +482,8 @@
flushOutFrame(writer, finalPass);
}
- for (int i = 0; i < currentWorkingAggregators.length; i++) {
- currentWorkingAggregators[i].close();
- }
+ aggregator.close();
+
runs.subList(0, runNumber).clear();
/**
* insert the new run file into the beginning of the run
@@ -549,53 +516,30 @@
outFrameAccessor.reset(outFrame);
for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
- int tupleOffset = outFrameAccessor
- .getTupleStartOffset(i);
- finalTupleBuilder.reset();
for (int j = 0; j < keyFields.length; j++) {
finalTupleBuilder.addField(outFrameAccessor, i, j);
}
- for (int j = 0; j < currentWorkingAggregators.length; j++) {
- int fieldOffset = outFrameAccessor
- .getFieldStartOffset(i, keyFields.length
- + j);
- if (isFinal)
- currentWorkingAggregators[j].outputFinalResult(
- finalTupleBuilder.getDataOutput(),
- outFrameAccessor.getBuffer().array(),
- tupleOffset
- + outFrameAccessor
- .getFieldSlotsLength()
- + fieldOffset,
- aggregateStates[j]);
- else
- currentWorkingAggregators[j]
- .outputPartialResult(
- finalTupleBuilder
- .getDataOutput(),
- outFrameAccessor.getBuffer()
- .array(),
- tupleOffset
- + outFrameAccessor
- .getFieldSlotsLength()
- + fieldOffset,
- aggregateStates[j]);
- finalTupleBuilder.addFieldEndOffset();
+
+ if(isFinal){
+ if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if(!aggregator.outputFinalResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
+ } else {
+ if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerFrameAppender.reset(writerFrame, true);
+ if(!aggregator.outputPartialResult(writerFrameAppender, outFrameAccessor, i, aggregateState)){
+ throw new HyracksDataException(
+ "Failed to write final aggregation result to a writer frame!");
+ }
+ }
}
-
- if (!writerFrameAppender.append(
- finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0,
- finalTupleBuilder.getSize())) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerFrameAppender.reset(writerFrame, true);
- if (!writerFrameAppender.append(
- finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0,
- finalTupleBuilder.getSize()))
- throw new HyracksDataException(
- "Failed to write final aggregation result to a writer frame!");
- }
+ aggregator.reset();
}
if (writerFrameAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(writerFrame, writer);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
index a61fd16..28a71ac 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/GroupingHashTable.java
@@ -28,206 +28,186 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
class GroupingHashTable {
- /**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
- *
- * @author vinayakb
- */
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 9;
+ /**
+ * The pointers in the link store 3 int values for each entry in the
+ * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ *
+ * @author vinayakb
+ */
+ private static class Link {
+ private static final int INIT_POINTERS_SIZE = 9;
- int[] pointers;
- int size;
+ int[] pointers;
+ int size;
- Link() {
- pointers = new int[INIT_POINTERS_SIZE];
- size = 0;
- }
+ Link() {
+ pointers = new int[INIT_POINTERS_SIZE];
+ size = 0;
+ }
- void add(int bufferIdx, int tIndex, int accumulatorIdx) {
- while (size + 3 > pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
- }
- pointers[size++] = bufferIdx;
- pointers[size++] = tIndex;
- pointers[size++] = accumulatorIdx;
- }
- }
+ void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+ while (size + 3 > pointers.length) {
+ pointers = Arrays.copyOf(pointers, pointers.length * 2);
+ }
+ pointers[size++] = bufferIdx;
+ pointers[size++] = tIndex;
+ pointers[size++] = accumulatorIdx;
+ }
+ }
- private static final int INIT_ACCUMULATORS_SIZE = 8;
- private final IHyracksTaskContext ctx;
- private final FrameTupleAppender appender;
- private final List<ByteBuffer> buffers;
- private final Link[] table;
- private AggregateState[][] aggregateStates;
- private int accumulatorSize;
+ private static final int INIT_AGG_STATE_SIZE = 8;
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleAppender appender;
+ private final List<ByteBuffer> buffers;
+ private final Link[] table;
+ private AggregateState[] aggregateStates;
+ private int accumulatorSize;
- private int lastBIndex;
- private final int[] fields;
- private final int[] storedKeys;
- private final IBinaryComparator[] comparators;
- private final FrameTuplePairComparator ftpc;
- private final ITuplePartitionComputer tpc;
- private final IFieldAggregateDescriptor[] aggregators;
- private final RecordDescriptor inRecordDescriptor;
- private final RecordDescriptor outRecordDescriptor;
+ private int lastBIndex;
+ private final int[] fields;
+ private final int[] storedKeys;
+ private final IBinaryComparator[] comparators;
+ private final FrameTuplePairComparator ftpc;
+ private final ITuplePartitionComputer tpc;
+ private final IAggregatorDescriptor aggregator;
- private final FrameTupleAccessor storedKeysAccessor;
+ private final FrameTupleAccessor storedKeysAccessor;
- GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
- IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf,
- IFieldAggregateDescriptorFactory[] aggregatorFactories,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int tableSize)
- throws HyracksDataException {
- this.ctx = ctx;
- appender = new FrameTupleAppender(ctx.getFrameSize());
- buffers = new ArrayList<ByteBuffer>();
- table = new Link[tableSize];
-
- this.aggregateStates = new AggregateState[aggregatorFactories.length][INIT_ACCUMULATORS_SIZE];
- accumulatorSize = 0;
-
- this.fields = fields;
- storedKeys = new int[fields.length];
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
- for (int i = 0; i < fields.length; ++i) {
- storedKeys[i] = i;
- storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
- }
-
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
- tpc = tpcf.createPartitioner();
-
- this.inRecordDescriptor = inRecordDescriptor;
- this.outRecordDescriptor = outRecordDescriptor;
-
- this.aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
- for (int i = 0; i < aggregatorFactories.length; i++) {
- this.aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
- this.inRecordDescriptor, this.outRecordDescriptor);
- }
- RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
- storedKeySerDeser);
- storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- storedKeysRecordDescriptor);
- lastBIndex = -1;
- addNewBuffer();
- }
+ GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ ITuplePartitionComputerFactory tpcf,
+ IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int tableSize)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ buffers = new ArrayList<ByteBuffer>();
+ table = new Link[tableSize];
- private void addNewBuffer() {
- ByteBuffer buffer = ctx.allocateFrame();
- buffer.position(0);
- buffer.limit(buffer.capacity());
- buffers.add(buffer);
- appender.reset(buffer, true);
- ++lastBIndex;
- }
+ this.fields = fields;
+ storedKeys = new int[fields.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+ for (int i = 0; i < fields.length; ++i) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+ }
- private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
- throws HyracksDataException {
- ByteBuffer frame = appender.getBuffer();
- frame.position(0);
- frame.limit(frame.capacity());
- writer.nextFrame(appender.getBuffer());
- appender.reset(appender.getBuffer(), true);
- }
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+ tpc = tpcf.createPartitioner();
- void insert(FrameTupleAccessor accessor, int tIndex)
- throws Exception {
- int entry = tpc.partition(accessor, tIndex, table.length);
- Link link = table[entry];
- if (link == null) {
- link = table[entry] = new Link();
- }
- int saIndex = -1;
- for (int i = 0; i < link.size; i += 3) {
- int sbIndex = link.pointers[i];
- int stIndex = link.pointers[i + 1];
- storedKeysAccessor.reset(buffers.get(sbIndex));
- int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
- if (c == 0) {
- saIndex = link.pointers[i + 2];
- break;
- }
- }
- if (saIndex < 0) {
- // Did not find the key. Insert a new entry.
- saIndex = accumulatorSize++;
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- addNewBuffer();
- if (!appender.appendProjection(accessor, tIndex, fields)) {
- throw new IllegalStateException();
- }
- }
- int sbIndex = lastBIndex;
- int stIndex = appender.getTupleCount() - 1;
- for (int i = 0; i < aggregators.length; i++) {
- AggregateState aggState = aggregators[i].createState();
- aggregators[i].init(accessor, tIndex, null, aggState);
- if (saIndex >= aggregateStates[i].length) {
- aggregateStates[i] = Arrays.copyOf(aggregateStates[i],
- aggregateStates[i].length * 2);
- }
- aggregateStates[i][saIndex] = aggState;
- }
- link.add(sbIndex, stIndex, saIndex);
- } else {
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].aggregate(accessor, tIndex, null, 0,
- aggregateStates[i][saIndex]);
- }
- }
- }
+ this.aggregator = aggregatorFactory.createAggregator(ctx,
+ inRecordDescriptor, outRecordDescriptor, fields);
- void write(IFrameWriter writer) throws HyracksDataException {
- ByteBuffer buffer = ctx.allocateFrame();
- appender.reset(buffer, true);
- ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- for (int i = 0; i < table.length; ++i) {
- Link link = table[i];
- if (link != null) {
- for (int j = 0; j < link.size; j += 3) {
- int bIndex = link.pointers[j];
- int tIndex = link.pointers[j + 1];
- int aIndex = link.pointers[j + 2];
- ByteBuffer keyBuffer = buffers.get(bIndex);
- storedKeysAccessor.reset(keyBuffer);
+ this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+ accumulatorSize = 0;
- tupleBuilder.reset();
- for (int k : this.fields) {
- tupleBuilder.addField(storedKeysAccessor, tIndex, k);
- }
- for (int k = 0; k < aggregators.length; k++) {
- aggregators[k].outputFinalResult(
- tupleBuilder.getDataOutput(), null, 0,
- aggregateStates[k][aIndex]);
- tupleBuilder.addFieldEndOffset();
- }
- while (!appender.append(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- flushFrame(appender, writer);
- }
- }
- }
- }
- if (appender.getTupleCount() != 0) {
- flushFrame(appender, writer);
- }
- }
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
+ storedKeySerDeser);
+ storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ storedKeysRecordDescriptor);
+ lastBIndex = -1;
+ addNewBuffer();
+ }
+
+ private void addNewBuffer() {
+ ByteBuffer buffer = ctx.allocateFrame();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ buffers.add(buffer);
+ appender.reset(buffer, true);
+ ++lastBIndex;
+ }
+
+ private void flushFrame(FrameTupleAppender appender, IFrameWriter writer)
+ throws HyracksDataException {
+ ByteBuffer frame = appender.getBuffer();
+ frame.position(0);
+ frame.limit(frame.capacity());
+ writer.nextFrame(appender.getBuffer());
+ appender.reset(appender.getBuffer(), true);
+ }
+
+ void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
+ int entry = tpc.partition(accessor, tIndex, table.length);
+ Link link = table[entry];
+ if (link == null) {
+ link = table[entry] = new Link();
+ }
+ int saIndex = -1;
+ for (int i = 0; i < link.size; i += 3) {
+ int sbIndex = link.pointers[i];
+ int stIndex = link.pointers[i + 1];
+ storedKeysAccessor.reset(buffers.get(sbIndex));
+ int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+ if (c == 0) {
+ saIndex = link.pointers[i + 2];
+ break;
+ }
+ }
+ if (saIndex < 0) {
+ // Did not find the key. Insert a new entry.
+ saIndex = accumulatorSize++;
+ // Add keys
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ addNewBuffer();
+ if (!appender.appendProjection(accessor, tIndex, fields)) {
+ throw new IllegalStateException();
+ }
+ }
+ // Add aggregation fields
+ int sbIndex = lastBIndex;
+ int stIndex = appender.getTupleCount() - 1;
+ AggregateState newState = aggregator.createAggregateStates();
+ aggregator.init(appender, accessor, tIndex, newState);
+
+ if (accumulatorSize >= aggregateStates.length) {
+ aggregateStates = Arrays.copyOf(aggregateStates,
+ aggregateStates.length * 2);
+ }
+
+ aggregateStates[saIndex] = newState;
+
+ link.add(sbIndex, stIndex, saIndex);
+ } else {
+ aggregator.aggregate(accessor, tIndex, null, 0,
+ aggregateStates[saIndex]);
+ }
+ }
+
+ void write(IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ appender.reset(buffer, true);
+ for (int i = 0; i < table.length; ++i) {
+ Link link = table[i];
+ if (link != null) {
+ for (int j = 0; j < link.size; j += 3) {
+ int bIndex = link.pointers[j];
+ int tIndex = link.pointers[j + 1];
+ int aIndex = link.pointers[j + 2];
+ ByteBuffer keyBuffer = buffers.get(bIndex);
+ storedKeysAccessor.reset(keyBuffer);
+
+ while (!aggregator.outputFinalResult(appender,
+ storedKeysAccessor, tIndex, aggregateStates[aIndex])) {
+ flushFrame(appender, writer);
+ }
+ aggregator.reset();
+ }
+ }
+ }
+ if (appender.getTupleCount() != 0) {
+ flushFrame(appender, writer);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
index 1b46bdd..2596b91 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashGroupOperatorDescriptor.java
@@ -53,20 +53,20 @@
private final ITuplePartitionComputerFactory tpcf;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
private final int tableSize;
public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
ITuplePartitionComputerFactory tpcf,
IBinaryComparatorFactory[] comparatorFactories,
- IFieldAggregateDescriptorFactory[] aggregatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor outRecordDescriptor, int tableSize) {
super(spec, 1, 1);
this.keys = keys;
this.tpcf = tpcf;
this.comparatorFactories = comparatorFactories;
- this.aggregatorFactories = aggregatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
recordDescriptors[0] = outRecordDescriptor;
this.tableSize = tableSize;
}
@@ -138,7 +138,7 @@
state = new HashBuildActivityState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
state.table = new GroupingHashTable(ctx, keys,
- comparatorFactories, tpcf, aggregatorFactories,
+ comparatorFactories, tpcf, aggregatorFactory,
recordDescProvider.getInputRecordDescriptor(
getOperatorId(), 0), recordDescriptors[0],
tableSize);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
index b3c36e6..1299a1e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/HashSpillableTableFactory.java
@@ -30,7 +30,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -72,7 +71,7 @@
final int[] keyFields,
IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IFieldAggregateDescriptorFactory[] aggregatorFactories,
+ IAggregatorDescriptorFactory aggregateFactory,
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int framesLimit)
throws HyracksDataException {
@@ -122,25 +121,15 @@
final ITuplePartitionComputer tpc = tpcf.createPartitioner();
final ByteBuffer outFrame = ctx.allocateFrame();
- final ArrayTupleBuilder internalTupleBuilder;
- if (keyFields.length < outRecordDescriptor.getFields().length)
- internalTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
- else
- internalTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length + 1);
- final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
- outRecordDescriptor.getFields().length);
final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
: firstKeyNormalizerFactory.createNormalizedKeyComputer();
- final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
- final AggregateState[] aggregateStates = new AggregateState[aggregatorFactories.length];
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
- inRecordDescriptor, outRecordDescriptor);
- aggregateStates[i] = aggregators[i].createState();
- }
+ final IAggregatorDescriptor aggregator = aggregateFactory
+ .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
+ keyFields);
+
+ final AggregateState aggregateState = aggregator
+ .createAggregateStates();
return new ISpillableTable() {
@@ -206,9 +195,7 @@
dataFrameCount = -1;
tPointers = null;
table.reset();
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].close();
- }
+ aggregator.reset();
}
@Override
@@ -234,33 +221,15 @@
} while (true);
if (!foundGroup) {
- /**
- * If no matching group is found, create a new aggregator
- * Create a tuple for the new group
- */
- internalTupleBuilder.reset();
- for (int i = 0; i < keyFields.length; i++) {
- internalTupleBuilder.addField(accessor, tIndex,
- keyFields[i]);
- }
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].init(accessor, tIndex,
- internalTupleBuilder.getDataOutput(),
- aggregateStates[i]);
- internalTupleBuilder.addFieldEndOffset();
- }
- if (!appender.append(
- internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0,
- internalTupleBuilder.getSize())) {
+
+ if (!aggregator.init(appender, accessor, tIndex,
+ aggregateState)) {
if (!nextAvailableFrame()) {
return false;
} else {
- if (!appender.append(
- internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0,
- internalTupleBuilder.getSize())) {
- throw new IllegalStateException(
+ if (!aggregator.init(appender, accessor, tIndex,
+ aggregateState)) {
+ throw new HyracksDataException(
"Failed to init an aggregator");
}
}
@@ -270,24 +239,10 @@
storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
table.insert(entry, storedTuplePointer);
} else {
- // If there is a matching found, do aggregation directly
- int tupleOffset = storedKeysAccessor1
- .getTupleStartOffset(storedTuplePointer.tupleIndex);
- for (int i = 0; i < aggregators.length; i++) {
- int aggFieldOffset = storedKeysAccessor1
- .getFieldStartOffset(
- storedTuplePointer.tupleIndex,
- keyFields.length + i);
- aggregators[i].aggregate(
- accessor,
- tIndex,
- storedKeysAccessor1.getBuffer().array(),
- tupleOffset
- + storedKeysAccessor1
- .getFieldSlotsLength()
- + aggFieldOffset, aggregateStates[i]);
- }
+ aggregator.aggregate(accessor, tIndex, storedKeysAccessor1,
+ storedTuplePointer.tupleIndex, aggregateState);
+
}
return true;
}
@@ -323,62 +278,29 @@
int tIndex = storedTuplePointer.tupleIndex;
storedKeysAccessor1.reset(frames.get(bIndex));
- int tupleOffset = storedKeysAccessor1
- .getTupleStartOffset(tIndex);
- // Reset the tuple for the partial result
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(
- storedKeysAccessor1, tIndex, k);
- }
- for (int k = 0; k < aggregators.length; k++) {
- int fieldStart = storedKeysAccessor1
- .getFieldStartOffset(tIndex,
- keyFields.length + k);
- if (isPartial)
- aggregators[k]
- .outputPartialResult(
- outputTupleBuilder
- .getDataOutput(),
- storedKeysAccessor1
- .getBuffer()
- .array(),
- tupleOffset
- + storedKeysAccessor1
- .getFieldSlotsLength()
- + fieldStart,
- aggregateStates[k]);
- else
- aggregators[k]
- .outputFinalResult(
- outputTupleBuilder
- .getDataOutput(),
- storedKeysAccessor1
- .getBuffer()
- .array(),
- tupleOffset
- + storedKeysAccessor1
- .getFieldSlotsLength()
- + fieldStart,
- aggregateStates[k]);
- outputTupleBuilder.addFieldEndOffset();
- }
- while (!appender.append(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
+ if (isPartial) {
+ while (!aggregator.outputPartialResult(
+ appender, storedKeysAccessor1, tIndex,
+ aggregateState)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ }
+ } else {
+ while (!aggregator.outputFinalResult(appender,
+ storedKeysAccessor1, tIndex,
+ aggregateState)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ }
}
+ aggregator.reset();
} while (true);
}
if (appender.getTupleCount() != 0) {
FrameUtils.flushFrame(outFrame, writer);
}
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].close();
- }
+ aggregator.close();
return;
}
int n = tPointers.length / 3;
@@ -393,54 +315,41 @@
ByteBuffer buffer = frames.get(frameIndex);
storedKeysAccessor1.reset(buffer);
- int tupleOffset = storedKeysAccessor1
- .getTupleStartOffset(tupleIndex);
-
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(storedKeysAccessor1,
- tupleIndex, k);
- }
- for (int k = 0; k < aggregators.length; k++) {
- int fieldStart = storedKeysAccessor1
- .getFieldStartOffset(tupleIndex,
- keyFields.length + k);
- if (isPartial)
- aggregators[k].outputPartialResult(
- outputTupleBuilder.getDataOutput(),
- storedKeysAccessor1.getBuffer().array(),
- tupleOffset
- + storedKeysAccessor1
- .getFieldSlotsLength()
- + fieldStart, aggregateStates[k]);
- else
- aggregators[k].outputFinalResult(outputTupleBuilder
- .getDataOutput(), storedKeysAccessor1
- .getBuffer().array(), tupleOffset
- + storedKeysAccessor1.getFieldSlotsLength()
- + fieldStart, aggregateStates[k]);
- outputTupleBuilder.addFieldEndOffset();
- }
- if (!appender.append(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.append(
- outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- throw new IllegalStateException();
+ if (isPartial) {
+ if (!aggregator
+ .outputPartialResult(appender,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!aggregator.outputPartialResult(appender,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState)) {
+ throw new HyracksDataException(
+ "Failed to output partial result.");
+ }
+ }
+ } else {
+ if (!aggregator
+ .outputFinalResult(appender,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!aggregator.outputFinalResult(appender,
+ storedKeysAccessor1, tupleIndex,
+ aggregateState)) {
+ throw new HyracksDataException(
+ "Failed to output partial result.");
+ }
}
}
+ aggregator.reset();
}
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
}
- for (int i = 0; i < aggregators.length; i++) {
- aggregators[i].close();
- }
+ aggregator.close();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
new file mode 100644
index 0000000..75c546f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregateStateFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface IAggregateStateFactory extends Serializable {
+
+ public int getStateLength();
+
+ public Object createState();
+
+ public boolean hasBinaryState();
+
+ public boolean hasObjectState();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
new file mode 100644
index 0000000..176806f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptor.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ *
+ */
+public interface IAggregatorDescriptor {
+
+ /**
+ * Create an aggregate state
+ *
+ * @return
+ */
+ public AggregateState createAggregateStates();
+
+ public int getAggregateStatesLength();
+
+ /**
+ * Initialize the state based on the input tuple.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param fieldOutput
+ * The data output for the frame containing the state. This may
+ * be null, if the state is maintained as a java object
+ * @param state
+ * The state to be initialized.
+ * @throws HyracksDataException
+ */
+ public boolean init(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException;
+
+ /**
+ * Reset the aggregator. The corresponding aggregate state should be reset
+ * too. Note that here the frame is not an input argument, since it can be
+ * reset outside of the aggregator (simply reset the starting index of the
+ * buffer).
+ *
+ * @param state
+ */
+ public void reset();
+
+ /**
+ * Aggregate the value. Aggregate state should be updated correspondingly.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param data
+ * The buffer containing the state, if frame-based-state is used.
+ * This means that it can be null if java-object-based-state is
+ * used.
+ * @param offset
+ * @param state
+ * The aggregate state.
+ * @throws HyracksDataException
+ */
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException;
+
+ /**
+ * Output the partial aggregation result.
+ *
+ * @param fieldOutput
+ * The data output for the output frame
+ * @param data
+ * The buffer containing the aggregation state
+ * @param offset
+ * @param state
+ * The aggregation state.
+ * @throws HyracksDataException
+ */
+ public boolean outputPartialResult(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException;
+
+ /**
+ * Output the final aggregation result.
+ *
+ * @param fieldOutput
+ * The data output for the output frame
+ * @param data
+ * The buffer containing the aggregation state
+ * @param offset
+ * @param state
+ * The aggregation state.
+ * @throws HyracksDataException
+ */
+ public boolean outputFinalResult(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException;
+
+ public void close();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..65b3873
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IAggregatorDescriptorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ *
+ */
+public interface IAggregatorDescriptorFactory extends Serializable {
+
+ IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
index 9bceea3..50da4cb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/IFieldAggregateDescriptor.java
@@ -23,13 +23,8 @@
*
*/
public interface IFieldAggregateDescriptor {
-
- /**
- * Create an aggregate state
- *
- * @return
- */
- public AggregateState createState();
+
+ public IAggregateStateFactory getAggregateStateFactory();
/**
* Initialize the state based on the input tuple.
@@ -46,6 +41,22 @@
public void init(IFrameTupleAccessor accessor, int tIndex,
DataOutput fieldOutput, AggregateState state)
throws HyracksDataException;
+
+ /**
+ * Initialize the state based on the input tuple.
+ *
+ * @param accessor
+ * @param tIndex
+ * @param fieldOutput
+ * The data output for the frame containing the state. This may
+ * be null, if the state is maintained as a java object
+ * @param state
+ * The state to be initialized.
+ * @throws HyracksDataException
+ */
+ public void initFromPartial(IFrameTupleAccessor accessor, int tIndex,
+ DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException;
/**
* Reset the aggregator. The corresponding aggregate state should be reset
@@ -55,7 +66,7 @@
*
* @param state
*/
- public void reset(AggregateState state);
+ public void reset();
/**
* Aggregate the value. Aggregate state should be updated correspondingly.
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
index 6420766..bae041d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/ISpillableTableFactory.java
@@ -25,7 +25,7 @@
public interface ISpillableTableFactory extends Serializable {
ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int[] keyFields,
IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory normalizedKeyComputerFactory,
- IFieldAggregateDescriptorFactory[] aggregatorFactories, RecordDescriptor inRecordDescriptor,
+ IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int framesLimit) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
similarity index 69%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
index 401fc51..3ecb968 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgFieldAggregatorFactory.java
@@ -24,20 +24,24 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
/**
*
*/
-public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
+public class AvgFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
private final int aggField;
- public AvgAggregatorFactory(int aggField){
+ private final boolean useObjectState;
+
+ public AvgFieldAggregatorFactory(int aggField, boolean useObjectState){
this.aggField = aggField;
+ this.useObjectState = useObjectState;
}
/* (non-Javadoc)
@@ -51,15 +55,14 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(AggregateState state) {
- state.reset();
+ public void reset() {
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data,
int offset, AggregateState state) throws HyracksDataException {
int sum, count;
- if (data != null) {
+ if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
@@ -80,7 +83,7 @@
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
int offset, AggregateState state) throws HyracksDataException {
int sum, count;
- if (data != null) {
+ if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
count = IntegerSerializerDeserializer.getInt(data, offset + 4);
} else {
@@ -109,7 +112,7 @@
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
count += 1;
- if (fieldOutput != null) {
+ if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
@@ -123,11 +126,6 @@
}
@Override
- public AggregateState createState() {
- return new AggregateState();
- }
-
- @Override
public void close() {
// TODO Auto-generated method stub
@@ -145,7 +143,7 @@
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
count += 1;
- if (data != null) {
+ if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
sum += buf.getInt(offset);
count += buf.getInt(offset + 4);
@@ -158,6 +156,63 @@
state.setState(new Integer[]{sum, count});
}
}
+
+ @Override
+ public IAggregateStateFactory getAggregateStateFactory() {
+ return new IAggregateStateFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean hasObjectState() {
+ return useObjectState;
+ }
+
+ @Override
+ public boolean hasBinaryState() {
+ return !useObjectState;
+ }
+
+ @Override
+ public int getStateLength() {
+ return 8;
+ }
+
+ @Override
+ public Object createState() {
+ return new Integer[]{0, 0};
+ }
+ };
+ }
+
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor,
+ int tIndex, DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException {
+ int sum = 0;
+ int count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+ sum += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
+ count += IntegerSerializerDeserializer.getInt(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart + 4);
+ if (!useObjectState) {
+ try {
+ fieldOutput.writeInt(sum);
+ fieldOutput.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ } else {
+ state.setState(new Integer[]{sum, count});
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
deleted file mode 100644
index df71f47..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgMergeAggregatorFactory.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-
-/**
- *
- */
-public class AvgMergeAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final int aggField;
-
- public AvgMergeAggregatorFactory(int aggField){
- this.aggField = aggField;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
- * IFieldAggregateDescriptorFactory
- * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
- * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
- * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
- */
- @Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor) throws HyracksDataException {
- return new IFieldAggregateDescriptor() {
-
- @Override
- public void reset(AggregateState state) {
- state.reset();
- }
-
- @Override
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
- int sum, count;
- if (data != null) {
- sum = IntegerSerializerDeserializer.getInt(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 4);
- } else {
- Integer[] fields = (Integer[])state.getState();
- sum = fields[0];
- count = fields[1];
- }
- try {
- fieldOutput.writeInt(sum);
- fieldOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
- }
-
- @Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
- int sum, count;
- if (data != null) {
- sum = IntegerSerializerDeserializer.getInt(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 4);
- } else {
- Integer[] fields = (Integer[])state.getState();
- sum = fields[0];
- count = fields[1];
- }
- try {
- fieldOutput.writeFloat((float)sum/count);
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
- throws HyracksDataException {
- int sum = 0;
- int count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart + 4);
- if (fieldOutput != null) {
- try {
- fieldOutput.writeInt(sum);
- fieldOutput.writeInt(count);
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- } else {
- state.setState(new Object[]{sum, count});
- }
- }
-
- @Override
- public AggregateState createState() {
- return new AggregateState();
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
- int sum = 0, count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += 1;
- if (data != null) {
- ByteBuffer buf = ByteBuffer.wrap(data);
- sum += buf.getInt(offset);
- count += buf.getInt(offset + 4);
- buf.putInt(offset, sum);
- buf.putInt(offset + 4, count);
- } else {
- Integer[] fields = (Integer[])state.getState();
- sum += fields[0];
- count += fields[1];
- state.setState(new Object[]{sum, count});
- }
- }
- };
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
similarity index 61%
copy from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
copy to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
index 401fc51..a56c669 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/AvgAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/CountFieldAggregatorFactory.java
@@ -24,22 +24,24 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
/**
*
*/
-public class AvgAggregatorFactory implements IFieldAggregateDescriptorFactory {
-
+public class CountFieldAggregatorFactory implements
+ IFieldAggregateDescriptorFactory {
+
private static final long serialVersionUID = 1L;
- private final int aggField;
+ private final boolean useObjectState;
- public AvgAggregatorFactory(int aggField){
- this.aggField = aggField;
+ public CountFieldAggregatorFactory(boolean useObjectState){
+ this.useObjectState = useObjectState;
}
-
+
/* (non-Javadoc)
* @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@@ -47,28 +49,22 @@
public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-
return new IFieldAggregateDescriptor() {
@Override
- public void reset(AggregateState state) {
- state.reset();
+ public void reset() {
}
@Override
public void outputPartialResult(DataOutput fieldOutput, byte[] data,
int offset, AggregateState state) throws HyracksDataException {
- int sum, count;
- if (data != null) {
- sum = IntegerSerializerDeserializer.getInt(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ int count;
+ if (!useObjectState) {
+ count = IntegerSerializerDeserializer.getInt(data, offset);
} else {
- Integer[] fields = (Integer[])state.getState();
- sum = fields[0];
- count = fields[1];
+ count = (Integer) state.getState();
}
try {
- fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
throw new HyracksDataException(
@@ -79,17 +75,14 @@
@Override
public void outputFinalResult(DataOutput fieldOutput, byte[] data,
int offset, AggregateState state) throws HyracksDataException {
- int sum, count;
- if (data != null) {
- sum = IntegerSerializerDeserializer.getInt(data, offset);
- count = IntegerSerializerDeserializer.getInt(data, offset + 4);
+ int count;
+ if (!useObjectState) {
+ count = IntegerSerializerDeserializer.getInt(data, offset);
} else {
- Integer[] fields = (Integer[])state.getState();
- sum = fields[0];
- count = fields[1];
+ count = (Integer) state.getState();
}
try {
- fieldOutput.writeFloat((float)sum/count);
+ fieldOutput.writeInt(count);
} catch (IOException e) {
throw new HyracksDataException(
"I/O exception when writing aggregation to the output buffer.");
@@ -100,36 +93,49 @@
public void init(IFrameTupleAccessor accessor, int tIndex,
DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
- int sum = 0;
- int count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += 1;
- if (fieldOutput != null) {
+ int count = 1;
+ if (!useObjectState) {
try {
- fieldOutput.writeInt(sum);
fieldOutput.writeInt(count);
} catch (IOException e) {
throw new HyracksDataException(
"I/O exception when initializing the aggregator.");
}
} else {
- state.setState(new Integer[]{sum, count});
+ state.setState(count);
}
}
@Override
- public AggregateState createState() {
- return new AggregateState();
+ public IAggregateStateFactory getAggregateStateFactory() {
+ return new IAggregateStateFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean hasObjectState() {
+ return useObjectState;
+ }
+
+ @Override
+ public boolean hasBinaryState() {
+ return !useObjectState;
+ }
+
+ @Override
+ public int getStateLength() {
+ return 4;
+ }
+
+ @Override
+ public Object createState() {
+ return new Integer(0);
+ }
+ };
}
@Override
public void close() {
- // TODO Auto-generated method stub
}
@@ -137,27 +143,23 @@
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
byte[] data, int offset, AggregateState state)
throws HyracksDataException {
- int sum = 0, count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
- sum += IntegerSerializerDeserializer.getInt(accessor
- .getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
- count += 1;
- if (data != null) {
+ int count = 1;
+ if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
- sum += buf.getInt(offset);
- count += buf.getInt(offset + 4);
- buf.putInt(offset, sum);
- buf.putInt(offset + 4, count);
+ count += buf.getInt(offset);
+ buf.putInt(offset, count);
} else {
- Integer[] fields = (Integer[])state.getState();
- sum += fields[0];
- count += fields[1];
- state.setState(new Integer[]{sum, count});
+ count += (Integer) state.getState();
+ state.setState(count);
}
}
+
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor,
+ int tIndex, DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException {
+ init(accessor, tIndex, fieldOutput, state);
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
similarity index 73%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
index d3b52f3..7db12f7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/IntSumFieldAggregatorFactory.java
@@ -24,21 +24,25 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
/**
*
*/
-public class IntSumAggregatorFactory implements
+public class IntSumFieldAggregatorFactory implements
IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
private final int aggField;
+
+ private final boolean useObjectState;
- public IntSumAggregatorFactory(int aggField) {
+ public IntSumFieldAggregatorFactory(int aggField, boolean useObjState) {
this.aggField = aggField;
+ this.useObjectState = useObjState;
}
/*
@@ -58,8 +62,7 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(AggregateState state) {
- state.reset();
+ public void reset() {
}
@Override
@@ -67,10 +70,10 @@
byte[] data, int offset, AggregateState state)
throws HyracksDataException {
int sum;
- if (data != null) {
+ if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
} else {
- sum = (Integer) (state.getState());
+ sum = (Integer) state.getState();
}
try {
fieldOutput.writeInt(sum);
@@ -85,10 +88,10 @@
int offset, AggregateState state)
throws HyracksDataException {
int sum;
- if (data != null) {
+ if (!useObjectState) {
sum = IntegerSerializerDeserializer.getInt(data, offset);
} else {
- sum = (Integer) (state.getState());
+ sum = (Integer) state.getState();
}
try {
fieldOutput.writeInt(sum);
@@ -109,7 +112,7 @@
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
- if (fieldOutput != null) {
+ if (!useObjectState) {
try {
fieldOutput.writeInt(sum);
} catch (IOException e) {
@@ -117,13 +120,36 @@
"I/O exception when initializing the aggregator.");
}
} else {
- state.setState(new Integer(sum));
+ state.setState(sum);
}
}
@Override
- public AggregateState createState() {
- return new AggregateState();
+ public IAggregateStateFactory getAggregateStateFactory(){
+ return new IAggregateStateFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean hasObjectState() {
+ return useObjectState;
+ }
+
+ @Override
+ public boolean hasBinaryState() {
+ return !useObjectState;
+ }
+
+ @Override
+ public int getStateLength() {
+ return 4;
+ }
+
+ @Override
+ public Object createState() {
+ return new Integer(0);
+ }
+ };
}
@Override
@@ -142,15 +168,22 @@
.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength()
+ fieldStart);
- if (data != null) {
+ if (!useObjectState) {
ByteBuffer buf = ByteBuffer.wrap(data);
sum += buf.getInt(offset);
buf.putInt(offset, sum);
} else {
- sum += (Integer) (state.getState());
- state.setState(new Integer(sum));
+ sum += (Integer) state.getState();
+ state.setState(sum);
}
}
+
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor,
+ int tIndex, DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException {
+ init(accessor, tIndex, fieldOutput, state);
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
similarity index 81%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
index f53108a..b76c285 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -27,13 +27,14 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
/**
*
*/
-public class MinMaxStringAggregatorFactory implements
+public class MinMaxStringFieldAggregatorFactory implements
IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -41,10 +42,13 @@
private final int aggField;
private final boolean isMax;
+
+ private final boolean hasBinaryState;
- public MinMaxStringAggregatorFactory(int aggField, boolean isMax) {
+ public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
this.aggField = aggField;
this.isMax = isMax;
+ this.hasBinaryState = hasBinaryState;
}
/*
@@ -63,8 +67,7 @@
return new IFieldAggregateDescriptor() {
@Override
- public void reset(AggregateState state) {
- state.reset();
+ public void reset() {
}
@Override
@@ -72,7 +75,7 @@
byte[] data, int offset, AggregateState state)
throws HyracksDataException {
try {
- if (data != null) {
+ if (hasBinaryState) {
int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
Object[] storedState = (Object[]) state.getState();
fieldOutput.writeUTF((String)storedState[stateIdx]);
@@ -90,11 +93,14 @@
int offset, AggregateState state)
throws HyracksDataException {
try {
- if (data != null) {
+ if (hasBinaryState) {
int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
Object[] storedState = (Object[]) state.getState();
fieldOutput.writeUTF((String)storedState[stateIdx]);
} else {
+ if(((String)state.getState()).equalsIgnoreCase("ic platelets lose carefully. blithely unu")){
+ System.out.print("");
+ }
fieldOutput.writeUTF((String) state.getState());
}
} catch (IOException e) {
@@ -116,7 +122,7 @@
.array(), tupleOffset
+ accessor.getFieldSlotsLength()
+ fieldStart, fieldLength)));
- if (fieldOutput != null) {
+ if (hasBinaryState) {
// Object-binary-state
Object[] storedState = (Object[]) state.getState();
if (storedState == null) {
@@ -146,11 +152,6 @@
}
@Override
- public AggregateState createState() {
- return new AggregateState();
- }
-
- @Override
public void close() {
// TODO Auto-generated method stub
@@ -169,7 +170,7 @@
.array(), tupleOffset
+ accessor.getFieldSlotsLength()
+ fieldStart, fieldLength)));
- if (data != null) {
+ if (hasBinaryState) {
int stateIdx = IntegerSerializerDeserializer.getInt(data,
offset);
Object[] storedState = (Object[]) state.getState();
@@ -199,6 +200,41 @@
}
}
}
+
+ @Override
+ public IAggregateStateFactory getAggregateStateFactory() {
+ return new IAggregateStateFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean hasObjectState() {
+ return true;
+ }
+
+ @Override
+ public boolean hasBinaryState() {
+ return hasBinaryState;
+ }
+
+ @Override
+ public int getStateLength() {
+ return 4;
+ }
+
+ @Override
+ public Object createState() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void initFromPartial(IFrameTupleAccessor accessor,
+ int tIndex, DataOutput fieldOutput, AggregateState state)
+ throws HyracksDataException {
+ init(accessor, tIndex, fieldOutput, state);
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
new file mode 100644
index 0000000..7fe636a
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MultiFieldsAggregatorFactory.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateStateFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.AggregateState;
+
+/**
+ *
+ */
+public class MultiFieldsAggregatorFactory implements
+ IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
+
+ public MultiFieldsAggregatorFactory(
+ IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+ this.aggregatorFactories = aggregatorFactories;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory
+ * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+ * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+ */
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, final int[] keyFields)
+ throws HyracksDataException {
+
+ final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
+ final IAggregateStateFactory[] aggregateStateFactories = new IAggregateStateFactory[aggregators.length];
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
+ inRecordDescriptor, outRecordDescriptor);
+ aggregateStateFactories[i] = aggregators[i]
+ .getAggregateStateFactory();
+ }
+
+ int stateTupleFieldCount = keyFields.length;
+ for (int i = 0; i < aggregateStateFactories.length; i++) {
+ if (aggregateStateFactories[i].hasBinaryState()) {
+ stateTupleFieldCount++;
+ }
+ }
+
+ final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(
+ stateTupleFieldCount);
+
+ final ArrayTupleBuilder resultTupleBuilder = new ArrayTupleBuilder(
+ outRecordDescriptor.getFields().length);
+
+ return new IAggregatorDescriptor() {
+
+ private boolean pending;
+
+ @Override
+ public void reset() {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].reset();
+ aggregateStateFactories[i] = aggregators[i]
+ .getAggregateStateFactory();
+ }
+ pending = false;
+ }
+
+ @Override
+ public boolean outputPartialResult(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ if (!pending) {
+ resultTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ resultTupleBuilder.addField(accessor, tIndex,
+ keyFields[i]);
+ }
+ DataOutput dos = resultTupleBuilder.getDataOutput();
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ for (int i = 0; i < aggregators.length; i++) {
+ int fieldOffset = accessor.getFieldStartOffset(tIndex,
+ keyFields.length + i);
+ aggregators[i].outputPartialResult(dos, accessor
+ .getBuffer().array(),
+ fieldOffset + accessor.getFieldSlotsLength()
+ + tupleOffset,
+ ((AggregateState[]) state.getState())[i]);
+ resultTupleBuilder.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
+ resultTupleBuilder.getByteArray(), 0,
+ resultTupleBuilder.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+
+ }
+
+ @Override
+ public boolean outputFinalResult(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ if (!pending) {
+ resultTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ resultTupleBuilder.addField(accessor, tIndex,
+ keyFields[i]);
+ }
+ DataOutput dos = resultTupleBuilder.getDataOutput();
+
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregateStateFactories[i].hasBinaryState()) {
+ int fieldOffset = accessor.getFieldStartOffset(
+ tIndex, keyFields.length + i);
+ aggregators[i].outputFinalResult(dos, accessor
+ .getBuffer().array(), tupleOffset
+ + accessor.getFieldSlotsLength()
+ + fieldOffset, ((AggregateState[]) state
+ .getState())[i]);
+ } else {
+ aggregators[i].outputFinalResult(dos, null, 0,
+ ((AggregateState[]) state.getState())[i]);
+ }
+ resultTupleBuilder.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(resultTupleBuilder.getFieldEndOffsets(),
+ resultTupleBuilder.getByteArray(), 0,
+ resultTupleBuilder.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean init(FrameTupleAppender appender,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ if (!pending) {
+ stateTupleBuilder.reset();
+ for (int i = 0; i < keyFields.length; i++) {
+ stateTupleBuilder.addField(accessor, tIndex,
+ keyFields[i]);
+ }
+ DataOutput dos = stateTupleBuilder.getDataOutput();
+
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].init(accessor, tIndex, dos,
+ ((AggregateState[]) state.getState())[i]);
+ if (aggregateStateFactories[i].hasBinaryState()) {
+ stateTupleBuilder.addFieldEndOffset();
+ }
+ }
+ }
+ if (!appender.append(stateTupleBuilder.getFieldEndOffsets(),
+ stateTupleBuilder.getByteArray(), 0,
+ stateTupleBuilder.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ AggregateState aggregateStates = new AggregateState();
+ AggregateState[] states = new AggregateState[aggregateStateFactories.length];
+ for (int i = 0; i < states.length; i++) {
+ states[i] = new AggregateState();
+ states[i]
+ .setState(aggregateStateFactories[i].createState());
+ }
+ aggregateStates.setState(states);
+ return aggregateStates;
+ }
+
+ @Override
+ public int getAggregateStatesLength() {
+ int stateLength = 0;
+ for (int i = 0; i < aggregateStateFactories.length; i++) {
+ stateLength += aggregateStateFactories[i].getStateLength();
+ }
+ return stateLength;
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ if (stateAccessor != null) {
+ int stateTupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int fieldIndex = 0;
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregateStateFactories[i].hasBinaryState()) {
+ int stateFieldOffset = stateAccessor
+ .getFieldStartOffset(stateTupleIndex,
+ keyFields.length + fieldIndex);
+ aggregators[i].aggregate(
+ accessor,
+ tIndex,
+ stateAccessor.getBuffer().array(),
+ stateTupleOffset
+ + stateAccessor
+ .getFieldSlotsLength()
+ + stateFieldOffset,
+ ((AggregateState[]) state.getState())[i]);
+ fieldIndex++;
+ } else {
+ aggregators[i].aggregate(accessor, tIndex, null, 0,
+ ((AggregateState[]) state.getState())[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].aggregate(accessor, tIndex, null, 0,
+ ((AggregateState[]) state.getState())[i]);
+ }
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 1e6766a..45468e7 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -42,10 +42,11 @@
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.aggregations.HashSpillableTableFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgMergeAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MultiFieldsAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -140,9 +141,11 @@
keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new IntSumAggregatorFactory(3) }, outputRec, tableSize);
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true) }),
+ outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
NC2_ID, NC1_ID);
@@ -193,12 +196,12 @@
frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new IntSumAggregatorFactory(3) },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new IntSumAggregatorFactory(3) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -243,6 +246,7 @@
new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
@@ -255,9 +259,10 @@
keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new AvgAggregatorFactory(1) }, outputRec, tableSize);
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new CountFieldAggregatorFactory(true),
+ new AvgFieldAggregatorFactory(1, true) }), outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
NC2_ID, NC1_ID);
@@ -296,6 +301,7 @@
new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
@@ -308,12 +314,14 @@
frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new AvgAggregatorFactory(1) },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new AvgMergeAggregatorFactory(2) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldAggregatorFactory(1, false) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new CountFieldAggregatorFactory(false),
+ new AvgFieldAggregatorFactory(2, false) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -370,9 +378,10 @@
keyFields,
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new MinMaxStringAggregatorFactory(15, true) }, outputRec, tableSize);
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new MinMaxStringFieldAggregatorFactory(15, true, false) }),
+ outputRec, tableSize);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
NC2_ID, NC1_ID);
@@ -423,12 +432,12 @@
frameLimits,
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new MinMaxStringAggregatorFactory(15, true) },
- new IFieldAggregateDescriptorFactory[] {
- new IntSumAggregatorFactory(1),
- new MinMaxStringAggregatorFactory(2, true) },
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true, true) }),
outputRec,
new HashSpillableTableFactory(
new FieldHashPartitionComputerFactory(
@@ -458,5 +467,5 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
}