Added hybrid-hash-group operator:
- added hybrid-hash-grouper operator;
- added external hash grouper in separated classes (so it can be invoked by hybrid-hash-grouper);
- updated aggregator interface for required features by hybrid-hash-grouper.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_external_gby_fix@1336 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 5be602c..80b3b20 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
@@ -169,6 +170,28 @@
}
}
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getFieldCount() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+ AggregateState state) throws HyracksDataException {
+ // it only works if the output of the aggregator fits in one
+ // frame
+ for (int i = 0; i < pipelines.length; i++) {
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ }
+ }
+
};
}
@@ -193,10 +216,7 @@
}
/**
- *
- *
* We suppose for now, that each subplan only produces one tuple.
- *
*/
private static class AggregatorOutput implements IFrameWriter {
@@ -231,10 +251,8 @@
}
/**
- *
* Since each pipeline only produces one tuple, this method is only
* called by the close method of the pipelines.
- *
*/
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 951376e..5ac7bbe 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -15,6 +15,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -28,7 +29,9 @@
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
throws HyracksDataException {
+
final int[] keys = keyFields;
+ final ArrayTupleBuilder initLengthCalBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
/**
* one IAggregatorDescriptor instance per Gby operator
@@ -183,6 +186,60 @@
}
}
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ initLengthCalBuilder.reset();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ int begin = initLengthCalBuilder.getSize();
+ if (aggs[i] == null) {
+ aggs[i] = aggFactories[i].createAggregateFunction();
+ }
+ aggs[i].init(initLengthCalBuilder.getDataOutput());
+ initLengthCalBuilder.addFieldEndOffset();
+ stateFieldLength[i] = initLengthCalBuilder.getSize() - begin;
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // doing initial aggregate
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = initLengthCalBuilder.getByteArray();
+ int prevFieldPos = i + keys.length - 1;
+ int start = prevFieldPos >= 0 ? initLengthCalBuilder.getFieldEndOffsets()[prevFieldPos] : 0;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return initLengthCalBuilder.getFieldEndOffsets().length * 4 + initLengthCalBuilder.getSize();
+ }
+
+ @Override
+ public int getFieldCount() {
+ return aggs.length;
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+ AggregateState state) throws HyracksDataException {
+ ftr.reset(accessor, tIndex);
+ int stateTupleStart = stateAccessor.getTupleStartOffset();
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = stateAccessor.getBuffer().array();
+ int start = stateAccessor.getFieldStartOffset(i + keys.length) + stateTupleStart;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
};
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 7a36943..e58e5ef 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
@@ -162,6 +163,23 @@
}
}
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ return 0;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return aggFactories.length;
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
};
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java
new file mode 100644
index 0000000..8d8881c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractImprovedHybridHashGrouper.java
@@ -0,0 +1,805 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public abstract class AbstractImprovedHybridHashGrouper implements IFrameAllocator {
+
+ public final int NO_MORE_FREE_BUFFER = -1;
+ private final int END_OF_PARTITION = -1;
+ private final int INVALID_BUFFER = -2;
+ private final int UNALLOCATED_FRAME = -3;
+
+ private final static int INT_SIZE = 4;
+
+ private RunFileWriter[] spilledPartitionWriters;
+
+ private final IHyracksTaskContext ctx;
+
+ private final int framesLimit;
+
+ private final int frameSize;
+
+ private final int[] keys, storedKeys;
+
+ private final ITuplePartitionComputer aggregatePartitionComputer;
+
+ private final IAggregatorDescriptor aggregator, merger;
+
+ private AggregateState[] partitionAggregateStates;
+
+ private AggregateState mergeState;
+
+ private final IFrameWriter finalWriter;
+
+ private final FrameTupleAccessor inputAccessor, partitionAccessor;
+
+ private final TupleInFrameAccessor hashtableEntryAccessor;
+
+ private final FrameTupleAppender appender;
+
+ private ByteBuffer[] frames;
+
+ protected final int numOfPartitions;
+
+ protected final int hashtableSize;
+
+ protected int[] partitionBeginningFrame, partitionCurrentFrame, framesNext, partitionSizeInTuple,
+ partitionSizeInFrame, hashtableEntrySize, framesOffset;
+
+ // hashtable headers
+ private int[] hashtableHeaders;
+
+ private int nextFreeFrameIndex, freeFramesCounter;
+
+ /**
+ * Status of the partition (0: resident partition, 1: spilled partition)
+ */
+ protected BitSet partitionStatus;
+
+ /**
+ * Tuple builder for aggregation
+ */
+ private ArrayTupleBuilder internalTupleBuilderForInsert, internalTupleBuilderForFlush;
+
+ private int hashtableSizeForPartition;
+
+ private ByteBuffer outputBuffer;
+
+ private FrameTupleAccessor mergeAccessor;
+
+ private RecordDescriptor internalRecordDescriptor;
+
+ private ITuplePartitionComputer mergePartitionComputer;
+
+ private final IBinaryComparator[] comparators;
+
+ /*
+ * For instrumenting
+ */
+
+ private int residentPartitionCount = 0, spilledPartitionCount = 0, reloadSpilledPartitionCount = 0;
+
+ private static final Logger LOGGER = Logger.getLogger(AbstractImprovedHybridHashGrouper.class.getSimpleName());
+
+ public AbstractImprovedHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit, int[] keys, int[] storedKeys,
+ int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
+ IBinaryComparator[] comparators, ITuplePartitionComputer aggregateHashtablePartitionComputer,
+ ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException {
+ this.framesLimit = framesLimit;
+ this.keys = keys;
+ this.hashtableSize = hashtableSize;
+ this.storedKeys = storedKeys;
+ this.aggregatePartitionComputer = aggregateHashtablePartitionComputer;
+ this.mergePartitionComputer = mergeHashtablePartitionComputer;
+ this.finalWriter = finalOutputWriter;
+ this.ctx = ctx;
+ this.frameSize = ctx.getFrameSize();
+ this.comparators = comparators;
+
+ // initialize the accessors and appenders
+ this.internalRecordDescriptor = outRecDesc;
+
+ if (keys.length >= outRecDesc.getFields().length) {
+ // for the case of zero-aggregations
+ ISerializerDeserializer<?>[] fields = outRecDesc.getFields();
+ ITypeTraits[] types = outRecDesc.getTypeTraits();
+ ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+ for (int i = 0; i < fields.length; i++)
+ newFields[i] = fields[i];
+ ITypeTraits[] newTypes = null;
+ if (types != null) {
+ newTypes = new ITypeTraits[types.length + 1];
+ for (int i = 0; i < types.length; i++)
+ newTypes[i] = types[i];
+ }
+ internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+ }
+
+ this.inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ this.partitionAccessor = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+ this.hashtableEntryAccessor = new TupleInFrameAccessor(internalRecordDescriptor);
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.internalTupleBuilderForInsert = new ArrayTupleBuilder(internalRecordDescriptor.getFieldCount());
+ this.internalTupleBuilderForFlush = new ArrayTupleBuilder(internalRecordDescriptor.getFieldCount());
+
+ // initialize the aggregator and merger
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecDesc, outRecDesc, keys, storedKeys);
+ this.merger = mergerFactory.createAggregator(ctx, outRecDesc, outRecDesc, storedKeys, storedKeys);
+
+ // set the free frames linked list
+ // - free frame starts from beginning
+ nextFreeFrameIndex = 0;
+ // - next links for frames
+ frames = new ByteBuffer[framesLimit];
+ framesNext = new int[framesLimit];
+ framesOffset = new int[framesLimit];
+ for (int i = 0; i < framesNext.length; i++) {
+ framesNext[i] = UNALLOCATED_FRAME;
+ framesOffset[i] = 0;
+ }
+ freeFramesCounter = framesLimit;
+
+ int hashtableHeaderCount = getHeaderPageCount(hashtableSize, ctx.getFrameSize());
+ if (hashtableHeaderCount > framesLimit) {
+ throw new HyracksDataException("Not enough frames to initialize the hash table");
+ }
+
+ this.numOfPartitions = getNumOfPartitions(targetNumOfPartitions, hashtableHeaderCount, ctx.getFrameSize());
+
+ // initialize the aggregate state
+ this.partitionAggregateStates = new AggregateState[numOfPartitions];
+ for (int i = 0; i < numOfPartitions; i++) {
+ partitionAggregateStates[i] = aggregator.createAggregateStates();
+ }
+
+ // initialize the header pages: assign a block of frames for hashtable headers
+ this.hashtableHeaders = new int[hashtableHeaderCount];
+ int headerFrameIndex = allocateFrame();
+ this.hashtableHeaders[0] = headerFrameIndex;
+ for (int i = 1; i < hashtableHeaderCount; i++) {
+ int newHeaderFrameIndex = allocateFrameForOverflowing(headerFrameIndex);
+ this.hashtableHeaders[i] = newHeaderFrameIndex;
+ headerFrameIndex = newHeaderFrameIndex;
+ }
+ for (int i = 0; i < hashtableHeaderCount; i++) {
+ for (int j = 0; j < frameSize; j += INT_SIZE) {
+ frames[hashtableHeaders[i]].putInt(j, -1);
+ }
+ }
+
+ // initialize frame for each partition
+
+ hashtableSizeForPartition = hashtableSize / numOfPartitions;
+
+ partitionStatus = new BitSet(numOfPartitions);
+ partitionBeginningFrame = new int[numOfPartitions];
+ partitionCurrentFrame = new int[numOfPartitions];
+ partitionSizeInTuple = new int[numOfPartitions];
+ partitionSizeInFrame = new int[numOfPartitions];
+ hashtableEntrySize = new int[numOfPartitions];
+ this.spilledPartitionWriters = new RunFileWriter[numOfPartitions];
+ for (int i = 0; i < numOfPartitions; i++) {
+ int newFrame = allocateFrame();
+ if (newFrame == NO_MORE_FREE_BUFFER) {
+ throw new HyracksDataException("Not enough memory for the partition plan");
+ }
+ partitionBeginningFrame[i] = newFrame;
+ partitionCurrentFrame[i] = newFrame;
+ partitionSizeInTuple[i] = 0;
+ partitionSizeInFrame[i] = 1;
+ framesNext[newFrame] = END_OF_PARTITION;
+ hashtableEntrySize[i] = hashtableSizeForPartition;
+ }
+
+ // for output
+ outputBuffer = frames[allocateFrame()];
+ LOGGER.info("Initialized Hybrid-hash-grouper: " + numOfPartitions + " Partitions, " + hashtableSize
+ + " Hashvals, " + hashtableHeaderCount + " Header Pages");
+ }
+
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputAccessor.reset(buffer);
+ int tupleCount = inputAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ int pid = aggregatePartitionComputer.partition(inputAccessor, i, numOfPartitions);
+ int hid = aggregatePartitionComputer.partition(inputAccessor, i, hashtableSize);
+ insert(inputAccessor, i, pid, hid);
+ }
+ }
+
+ private void insert(FrameTupleAccessor accessor, int tupleIndex, int pid, int hid) throws HyracksDataException {
+ if (!partitionStatus.get(pid)) {
+ insertResidentPartitionRecord(keys, accessor, tupleIndex, pid, hid, aggregator,
+ partitionAggregateStates[pid], true);
+ } else {
+ // spilled partition
+ // Spilled partition: insert into the partition only, and flush when necessary
+ appender.reset(frames[partitionCurrentFrame[pid]], false);
+ // - build the aggregation value
+ internalTupleBuilderForInsert.reset();
+ for (int k = 0; k < keys.length; k++) {
+ internalTupleBuilderForInsert.addField(accessor, tupleIndex, keys[k]);
+ }
+ aggregator.init(internalTupleBuilderForInsert, accessor, tupleIndex, partitionAggregateStates[pid]);
+ if (!appender.appendSkipEmptyField(internalTupleBuilderForInsert.getFieldEndOffsets(),
+ internalTupleBuilderForInsert.getByteArray(), 0, internalTupleBuilderForInsert.getSize())) {
+ flushSpilledPartition(pid);
+ partitionSizeInFrame[pid]++;
+ appender.reset(frames[partitionCurrentFrame[pid]], true);
+ if (!appender.appendSkipEmptyField(internalTupleBuilderForInsert.getFieldEndOffsets(),
+ internalTupleBuilderForInsert.getByteArray(), 0, internalTupleBuilderForInsert.getSize())) {
+ throw new HyracksDataException("Failed to insert a tuple into a spilled partition.");
+ }
+ }
+
+ partitionSizeInTuple[pid]++;
+ }
+ }
+
+ private void insertResidentPartitionRecord(int[] recKeys, FrameTupleAccessor accessor, int tupleIndex, int pid,
+ int hid, IAggregatorDescriptor grouper, AggregateState groupState, boolean allowSpill)
+ throws HyracksDataException {
+ // resident partition
+ // - find the header record
+ int headerFrameIndex = hid * 2 * INT_SIZE / frameSize;
+ int headerFrameOffset = hid * 2 * INT_SIZE % frameSize;
+ int entryFrameIndex = -1, entryFrameOffset = -1;
+ int pointerFrameIndex = hashtableHeaders[headerFrameIndex], pointerFrameOffset = headerFrameOffset;
+ if (headerFrameIndex >= 0) {
+ entryFrameIndex = frames[pointerFrameIndex].getInt(headerFrameOffset);
+ entryFrameOffset = frames[pointerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+ do {
+ if (entryFrameIndex < 0) {
+ break;
+ }
+ int actualFrameIndex = getHashtablePartitionFrame(pid, entryFrameIndex);
+ ByteBuffer bufferContainsNextEntry = frames[actualFrameIndex];
+
+ hashtableEntryAccessor.reset(bufferContainsNextEntry, entryFrameOffset);
+ int c = compare(recKeys, accessor, tupleIndex, storedKeys, hashtableEntryAccessor);
+ if (c == 0) {
+ break;
+ }
+ // move to the next entry along the linked list
+ pointerFrameIndex = actualFrameIndex;
+ pointerFrameOffset = entryFrameOffset + hashtableEntryAccessor.getTupleLength();
+
+ entryFrameIndex = bufferContainsNextEntry.getInt(pointerFrameOffset);
+ entryFrameOffset = bufferContainsNextEntry.getInt(pointerFrameOffset + INT_SIZE);
+ } while (true);
+ }
+ if (entryFrameIndex >= 0) {
+ // Found the entry; do aggregation
+ grouper.aggregate(accessor, tupleIndex, hashtableEntryAccessor, groupState);
+
+ } else {
+ // No matching entry; do insertion
+ int frameIndexToInsert = partitionSizeInFrame[pid] - 1;
+ int frameOffsetToInsert = framesOffset[partitionCurrentFrame[pid]];
+ // check whether there is enough space for the insertion
+ // - build the aggregation value
+ internalTupleBuilderForInsert.reset();
+ for (int k = 0; k < recKeys.length; k++) {
+ internalTupleBuilderForInsert.addField(accessor, tupleIndex, recKeys[k]);
+ }
+
+ int initLength = internalTupleBuilderForInsert.getSize() + grouper.getInitSize(accessor, tupleIndex);
+ int fieldCount = recKeys.length + grouper.getFieldCount();
+
+ if (frameSize - frameOffsetToInsert < fieldCount * INT_SIZE + initLength + INT_SIZE * 2) {
+
+ int newPartitionFrameIndex = allocateFrameForOverflowing(partitionCurrentFrame[pid]);
+ if (newPartitionFrameIndex == NO_MORE_FREE_BUFFER) {
+ if (allowSpill) {
+ // need to make some space
+ int pidToSpill = selectPartitionToSpill();
+ if (pidToSpill == -1) {
+ throw new HyracksDataException("Cannot allocate memory for a resident partition.");
+ }
+ spillResidentPartition(pidToSpill);
+ insert(accessor, tupleIndex, pid, hid);
+ return;
+ } else {
+ throw new HyracksDataException("Not enough space for processing a resident partition");
+ }
+ }
+
+ int currentPartFrame = partitionCurrentFrame[pid];
+ partitionCurrentFrame[pid] = newPartitionFrameIndex;
+ framesNext[newPartitionFrameIndex] = currentPartFrame;
+ framesOffset[newPartitionFrameIndex] = 0;
+ partitionSizeInFrame[pid]++;
+
+ frameIndexToInsert = partitionSizeInFrame[pid] - 1;
+ frameOffsetToInsert = framesOffset[partitionCurrentFrame[pid]];
+ if (frameSize - frameOffsetToInsert < fieldCount * INT_SIZE + initLength + INT_SIZE * 2) {
+ throw new HyracksDataException("Failed to allocate memory for an aggregation tuple.");
+ }
+ }
+
+ frames[pointerFrameIndex].putInt(pointerFrameOffset, frameIndexToInsert);
+ frames[pointerFrameIndex].putInt(pointerFrameOffset + INT_SIZE, frameOffsetToInsert);
+
+ grouper.init(internalTupleBuilderForInsert, accessor, tupleIndex, groupState);
+
+ for (int i = 0; i < internalTupleBuilderForInsert.getFieldEndOffsets().length; i++) {
+ frames[partitionCurrentFrame[pid]].putInt(frameOffsetToInsert + i * INT_SIZE,
+ internalTupleBuilderForInsert.getFieldEndOffsets()[i]);
+ }
+ System.arraycopy(internalTupleBuilderForInsert.getByteArray(), 0,
+ frames[partitionCurrentFrame[pid]].array(),
+ frameOffsetToInsert + internalTupleBuilderForInsert.getFieldEndOffsets().length * INT_SIZE,
+ internalTupleBuilderForInsert.getSize());
+
+ int linkedListRefOffset = frameOffsetToInsert + internalTupleBuilderForInsert.getFieldEndOffsets().length
+ * INT_SIZE + internalTupleBuilderForInsert.getSize();
+
+ frames[partitionCurrentFrame[pid]].putInt(linkedListRefOffset, -1);
+ frames[partitionCurrentFrame[pid]].putInt(linkedListRefOffset + INT_SIZE, -1);
+
+ // update the frame offset
+ framesOffset[partitionCurrentFrame[pid]] += internalTupleBuilderForInsert.getFieldEndOffsets().length
+ * INT_SIZE + internalTupleBuilderForInsert.getSize() + 2 * INT_SIZE;
+
+ partitionSizeInTuple[pid]++;
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ // flush all resident partitions
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (!partitionStatus.get(i)) {
+ flushResidentPartition(i, finalWriter, aggregator, partitionAggregateStates[i], false);
+ residentPartitionCount++;
+ } else {
+ if (frames[partitionCurrentFrame[i]].getInt(frames[partitionCurrentFrame[i]].capacity() - 4) > 0)
+ flushSpilledPartition(i);
+ recycleFrame(partitionCurrentFrame[i]);
+ partitionCurrentFrame[i] = UNALLOCATED_FRAME;
+ partitionSizeInFrame[i]++;
+ if (spilledPartitionWriters[i] != null) {
+ spilledPartitionWriters[i].close();
+ }
+ spilledPartitionCount++;
+ }
+ }
+
+ partitionAggregateStates = null;
+
+ if (this.mergeState == null)
+ this.mergeState = merger.createAggregateStates();
+
+ // clean up the header frames
+ for (int i = 0; i < hashtableHeaders.length; i++) {
+ recycleFrame(hashtableHeaders[i]);
+ }
+
+ hashtableHeaders = null;
+
+ // Select spilled partition to reload
+ RunFileReader runReader;
+ int runFileBufferIndex = allocateFrame();
+ ByteBuffer runFileBuffer = frames[runFileBufferIndex];
+
+ int newHeadPagesCount = getHeaderPageCount(hashtableSize / numOfPartitions, frameSize);
+
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (partitionStatus.get(i)) {
+ mergeState.reset();
+ // check if the partition can be maintained in the main memory
+ if (partitionSizeInFrame[i] > 0 && partitionSizeInFrame[i] + newHeadPagesCount < freeFramesCounter) {
+ reloadSpilledPartitionCount++;
+ LOGGER.warning("HybridHashGrouper-Merge " + partitionSizeInFrame[i] + " " + partitionSizeInTuple[i]);
+
+ partitionCurrentFrame[i] = allocateFrame();
+ if (partitionCurrentFrame[i] < 0) {
+ throw new HyracksDataException("Cannot allocate frame for merging.");
+ }
+ framesOffset[partitionCurrentFrame[i]] = 0;
+ // reset the size counter
+ partitionSizeInFrame[i] = 1;
+ partitionSizeInTuple[i] = 0;
+
+ framesNext[partitionCurrentFrame[i]] = END_OF_PARTITION;
+
+ // initialize the hash table header pages
+ hashtableHeaders = new int[newHeadPagesCount];
+ int headerFrameIndex = allocateFrame();
+ this.hashtableHeaders[0] = headerFrameIndex;
+ for (int k = 0; k < frameSize; k += INT_SIZE) {
+ frames[this.hashtableHeaders[0]].putInt(k, -1);
+ }
+ for (int j = 1; j < hashtableHeaders.length; j++) {
+ int newHeaderFrameIndex = allocateFrameForOverflowing(headerFrameIndex);
+ this.hashtableHeaders[j] = newHeaderFrameIndex;
+ headerFrameIndex = newHeaderFrameIndex;
+ for (int k = 0; k < frameSize; k += INT_SIZE) {
+ frames[newHeaderFrameIndex].putInt(k, -1);
+ }
+ }
+
+ runReader = spilledPartitionWriters[i].createReader();
+ runReader.open();
+ if (mergeAccessor == null) {
+ mergeAccessor = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+ }
+ while (runReader.nextFrame(runFileBuffer)) {
+ mergeAccessor.reset(runFileBuffer);
+ int tupleCount = mergeAccessor.getTupleCount();
+ for (int j = 0; j < tupleCount; j++) {
+ merge(mergeAccessor, j, i);
+ }
+ }
+ // close the run file reader and writer
+ runReader.close();
+ spilledPartitionWriters[i].close();
+ spilledPartitionWriters[i] = null;
+ // flush the in-memory hash table into the output
+ flushResidentPartition(i, finalWriter, merger, mergeState, false);
+ }
+ }
+ }
+ LOGGER.warning("HybridHashGrouper " + residentPartitionCount + " " + spilledPartitionCount + " "
+ + reloadSpilledPartitionCount);
+ }
+
+ private void merge(FrameTupleAccessor accessor, int tupleIndex, int pid) throws HyracksDataException {
+ int hid = mergePartitionComputer.partition(accessor, tupleIndex, hashtableEntrySize[pid]);
+ insertResidentPartitionRecord(storedKeys, accessor, tupleIndex, pid, hid, merger, mergeState, false);
+ }
+
+ protected void spillResidentPartition(int pid) throws HyracksDataException {
+ // flush the data partition
+ RunFileWriter writer = spilledPartitionWriters[pid];
+ if (writer == null) {
+ writer = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+ AbstractImprovedHybridHashGrouper.class.getSimpleName()), ctx.getIOManager());
+ writer.open();
+ spilledPartitionWriters[pid] = writer;
+ }
+ flushResidentPartition(pid, writer, aggregator, partitionAggregateStates[pid], true);
+ partitionStatus.set(pid);
+ }
+
+ protected void flushSpilledPartition(int pid) throws HyracksDataException {
+ RunFileWriter writer = spilledPartitionWriters[pid];
+ if (writer == null) {
+ writer = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+ AbstractImprovedHybridHashGrouper.class.getName()), ctx.getIOManager());
+ writer.open();
+ spilledPartitionWriters[pid] = writer;
+ }
+ flushSpilledPartitionToWriter(pid, writer);
+ }
+
+ private void flushSpilledPartitionToWriter(int pid, IFrameWriter writer) throws HyracksDataException {
+ int spillPartFrame = partitionCurrentFrame[pid];
+ int firstPartFrame = spillPartFrame;
+ appender.reset(outputBuffer, true);
+ while (spillPartFrame != END_OF_PARTITION) {
+ partitionAccessor.reset(frames[spillPartFrame]);
+ int tupleCount = partitionAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ internalTupleBuilderForFlush.reset();
+ for (int k = 0; k < storedKeys.length; k++) {
+ internalTupleBuilderForFlush.addField(partitionAccessor, i, storedKeys[k]);
+ }
+ aggregator.outputPartialResult(internalTupleBuilderForFlush, partitionAccessor, i,
+ partitionAggregateStates[pid]);
+ if (!appender.appendSkipEmptyField(internalTupleBuilderForFlush.getFieldEndOffsets(),
+ internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ appender.reset(outputBuffer, true);
+ if (!appender.appendSkipEmptyField(internalTupleBuilderForFlush.getFieldEndOffsets(),
+ internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+ throw new HyracksDataException("Failed to flush a spilled partition.");
+ }
+ }
+ }
+
+ framesOffset[spillPartFrame] = 0;
+ int nextFrame = framesNext[spillPartFrame];
+ if (spillPartFrame != firstPartFrame) {
+ recycleFrame(spillPartFrame);
+ }
+ spillPartFrame = nextFrame;
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ frames[firstPartFrame].clear();
+ frames[firstPartFrame].putInt(frames[firstPartFrame].capacity() - 4, 0);
+ framesNext[firstPartFrame] = END_OF_PARTITION;
+ // reset the aggregate state
+ partitionAggregateStates[pid].reset();
+ }
+
+ private void flushResidentPartition(int pid, IFrameWriter writer, IAggregatorDescriptor grouper,
+ AggregateState groupState, boolean isPartial) throws HyracksDataException {
+ int residentPartFrame = partitionCurrentFrame[pid];
+ int firstPartFrame = residentPartFrame;
+ appender.reset(outputBuffer, true);
+ while (residentPartFrame != END_OF_PARTITION) {
+ int frameOffset = 0;
+
+ while (frameOffset < framesOffset[residentPartFrame]) {
+ hashtableEntryAccessor.reset(frames[residentPartFrame], frameOffset);
+ int tupleLength = hashtableEntryAccessor.getTupleLength();
+ int fieldOffsetLength = hashtableEntryAccessor.getFieldCount() * INT_SIZE;
+ internalTupleBuilderForFlush.reset();
+
+ for (int k = 0; k < storedKeys.length; k++) {
+ internalTupleBuilderForFlush.addField(frames[residentPartFrame].array(), frameOffset
+ + fieldOffsetLength + hashtableEntryAccessor.getFieldStartOffset(k),
+ hashtableEntryAccessor.getFieldLength(k));
+ }
+
+ if (isPartial)
+ grouper.outputPartialResult(internalTupleBuilderForFlush, frames[residentPartFrame].array(),
+ frameOffset, tupleLength, hashtableEntryAccessor.getFieldCount(), INT_SIZE, groupState);
+ else
+ grouper.outputFinalResult(internalTupleBuilderForFlush, frames[residentPartFrame].array(),
+ frameOffset, tupleLength, hashtableEntryAccessor.getFieldCount(), INT_SIZE, groupState);
+
+ if (!appender.append(internalTupleBuilderForFlush.getFieldEndOffsets(),
+ internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ appender.reset(outputBuffer, true);
+ if (!appender.append(internalTupleBuilderForFlush.getFieldEndOffsets(),
+ internalTupleBuilderForFlush.getByteArray(), 0, internalTupleBuilderForFlush.getSize())) {
+ throw new HyracksDataException("Failed to flush a spilled partition.");
+ }
+ }
+
+ frameOffset += tupleLength + INT_SIZE * 2;
+ }
+
+ // release the frame
+ framesOffset[residentPartFrame] = 0;
+ int nextFrame = framesNext[residentPartFrame];
+ if (residentPartFrame != firstPartFrame) {
+ recycleFrame(residentPartFrame);
+ }
+ residentPartFrame = nextFrame;
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ frames[firstPartFrame].clear();
+ frames[firstPartFrame].putInt(frames[firstPartFrame].capacity() - 4, 0);
+ framesNext[firstPartFrame] = END_OF_PARTITION;
+ }
+
+ /**
+ * Get the number of header pages for the built-in hashtable.
+ * <p/>
+ * The number of header pages for the hashtable is calculated by dividing the total memory usage of all hashtable
+ * keys over the frame size. Each hashtable key takes two integer-length fields, indicating the frame index and the
+ * tuple index of the first entry with this hash key value.
+ * <p/>
+ *
+ * @param hashtableSize
+ * @param frameSize
+ * @return
+ */
+ private int getHeaderPageCount(int hashtableSize, int frameSize) {
+ int hashtableHeaderPageResidual = (hashtableSize % frameSize) * (INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
+ return hashtableSize / frameSize * (INT_SIZE * 2) + hashtableHeaderPageResidual;
+ }
+
+ /**
+ * Get the number of partitions.
+ * <p/>
+ * The number of partition is decided by the target number of partitions, and also the size of the hashtable. The
+ * goal is to contain the whole hash table header, and for each partition at least one page can be assigned.
+ * <p/>
+ * In case that the target number of partitions is too large, or the available frame size is too small, the number
+ * of partition will be adjusted to be the available frames except for the headers and other reversed frames.
+ * <p/>
+ *
+ * @param targetNumOfPartitions
+ * @param hashtableHeaderPageCount
+ * @param frameSize
+ * @return
+ */
+ private int getNumOfPartitions(int targetNumOfPartitions, int hashtableHeaderPageCount, int frameSize) {
+ if (targetNumOfPartitions + hashtableHeaderPageCount + getReservedFramesCount() > framesLimit) {
+ return framesLimit - getReservedFramesCount() - hashtableHeaderPageCount;
+ } else {
+ return targetNumOfPartitions;
+ }
+ }
+
+ private void recycleFrame(int frameIdx) {
+ frames[frameIdx].clear();
+ framesOffset[frameIdx] = 0;
+ frames[frameIdx].putInt(frames[frameIdx].capacity() - 4, 0);
+ int oldFreeHead = nextFreeFrameIndex;
+ nextFreeFrameIndex = frameIdx;
+ framesNext[nextFreeFrameIndex] = oldFreeHead;
+ freeFramesCounter++;
+ }
+
+ /**
+ * Allocate frame in case of the given overflowing frame. The allocated frame
+ * will become the head of the linked list, and its next pointer points to the
+ * overflowing frame.
+ */
+ @Override
+ public int allocateFrameForOverflowing(int frameIndex) throws HyracksDataException {
+ int newFrameIndex = allocateFrame();
+ if (newFrameIndex < 0)
+ return newFrameIndex;
+ framesNext[newFrameIndex] = frameIndex;
+ return newFrameIndex;
+ }
+
+ /**
+ * Allocate a free frame.
+ * <p/>
+ * If {@link #nextFreeFrameIndex} points to an unallocated frame slot, the frame is initialized and returned. Then
+ * the next frame of the free frame replaces the first free page {@link #nextFreeFrameIndex}.
+ * <p/>
+ * When there is no more free frame available, {@link #nextFreeFrameIndex} will point to
+ * {@link #NO_MORE_FREE_BUFFER}.
+ * <p/>
+ *
+ * @return
+ * @throws HyracksDataException
+ */
+ @Override
+ public int allocateFrame() throws HyracksDataException {
+ if (nextFreeFrameIndex != NO_MORE_FREE_BUFFER) {
+ int freeFrameIndex = nextFreeFrameIndex;
+ if (frames[freeFrameIndex] == null) {
+ frames[freeFrameIndex] = ctx.allocateFrame();
+ }
+ int oldNext = framesNext[freeFrameIndex];
+ framesNext[freeFrameIndex] = INVALID_BUFFER;
+ if (oldNext == UNALLOCATED_FRAME) {
+ nextFreeFrameIndex++;
+ if (nextFreeFrameIndex == framesLimit) { // No more free buffer
+ nextFreeFrameIndex = NO_MORE_FREE_BUFFER;
+ }
+ } else {
+ nextFreeFrameIndex = oldNext;
+ }
+ (frames[freeFrameIndex]).clear();
+
+ freeFramesCounter--;
+ if (freeFramesCounter < 0) {
+ throw new HyracksDataException("Memory underflow!");
+ }
+ return freeFrameIndex;
+ } else {
+ return NO_MORE_FREE_BUFFER; // A partitions needs to be spilled (if feasible)
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IFrameAllocator#getFrame(int)
+ */
+ @Override
+ public byte[] getFrame(int frameIndex) throws HyracksDataException {
+ if (frameIndex < 0 || frameIndex > frames.length) {
+ return null;
+ }
+ return frames[frameIndex].array();
+ }
+
+ private int compare(int[] keys0, FrameTupleAccessor accessor0, int tIndex0, int[] keys1,
+ TupleInFrameAccessor accessor1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset();
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < keys0.length; ++i) {
+ int fIdx0 = keys0[i];
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fIdx1 = keys1[i];
+ int fStart1 = accessor1.getFieldStartOffset(fIdx1);
+ int fEnd1 = accessor1.getFieldEndOffset(fIdx1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private int getHashtablePartitionFrame(int pid, int frameIndex) {
+ if (frameIndex > partitionSizeInFrame[pid] - 1) {
+ return -1;
+ }
+ int fIdx = partitionCurrentFrame[pid];
+ for (int i = partitionSizeInFrame[pid] - 1; i > frameIndex; i--) {
+ fIdx = framesNext[fIdx];
+ }
+ return fIdx;
+ }
+
+ public RunFileWriter getSpilledRunWriter(int pid) {
+ if (pid >= 0 && pid < numOfPartitions) {
+ return spilledPartitionWriters[pid];
+ }
+ return null;
+ }
+
+ public int getPartitionSizeInFrame(int pid) {
+ if (pid >= 0 && pid < numOfPartitions) {
+ return partitionSizeInFrame[pid];
+ }
+ return -1;
+ }
+
+ public boolean hasSpilledPartition() {
+ boolean hasSpilledPart = false;
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (spilledPartitionWriters[i] != null) {
+ hasSpilledPart = true;
+ break;
+ }
+ }
+ return hasSpilledPart;
+ }
+
+ public int getPartitionMaxFrameSize() {
+ int maxSize = 0;
+ for (int i = partitionStatus.nextSetBit(0); i >= 0; i = partitionStatus.nextSetBit(i + 1)) {
+ if (partitionSizeInFrame[i] > maxSize) {
+ maxSize = partitionSizeInFrame[i];
+ }
+ }
+ return maxSize;
+ }
+
+ public int getNumOfPartitions() {
+ return numOfPartitions;
+ }
+
+ abstract public int getReservedFramesCount();
+
+ abstract public int selectPartitionToSpill();
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java
new file mode 100644
index 0000000..6897505
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperAggregate.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class ExternalHashGrouperAggregate {
+
+ private static final Logger LOGGER = Logger.getLogger(ExternalHashGrouperAggregate.class.getName());
+
+ private final int[] keyFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+
+ private final int framesLimit;
+ private final ISpillableTableFactory spillableTableFactory;
+ private final boolean isOutputSorted;
+
+ private final RecordDescriptor inRecDesc, outRecDesc;
+
+ private final IHyracksTaskContext ctx;
+
+ private final boolean skipSort;
+
+ private final boolean skipRun;
+
+ private final int tableSize;
+
+ /*
+ * For aggregate phase
+ */
+
+ LinkedList<RunFileReader> runs;
+ ISpillableTable gTable;
+ FrameTupleAccessor inFrameAccessor;
+
+ /*
+ * For instrumenting
+ */
+ long ioCounter = 0;
+ long sortTimeCounter = 0;
+ long aggTimeStart = 0;
+ long frameFlushRequest = 0;
+ long frameProcessingTime = 0;
+ long frameFlushTime = 0;
+
+ /*
+ * For output
+ */
+ private final IFrameWriter finalWriter;
+
+ /**
+ * @param ctx
+ * @param keyFields
+ * @param framesLimit
+ * @param tableSize
+ * @param comparatorFactories
+ * @param firstNormalizerFactory
+ * @param aggregatorFactory
+ * @param mergerFactory
+ * @param inRecDesc
+ * @param outRecDesc
+ * @param spillableTableFactory
+ * @param isOutputSorted
+ */
+ public ExternalHashGrouperAggregate(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+ IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
+ ISpillableTableFactory spillableTableFactory, IFrameWriter finalWriter, boolean isOutputSorted) {
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+ this.aggregatorFactory = aggregatorFactory;
+ this.framesLimit = framesLimit;
+ this.spillableTableFactory = spillableTableFactory;
+ this.isOutputSorted = isOutputSorted;
+ this.tableSize = tableSize;
+ this.inRecDesc = inRecDesc;
+ this.outRecDesc = outRecDesc;
+ this.ctx = ctx;
+ this.finalWriter = finalWriter;
+
+ this.skipSort = false;
+ this.skipRun = false;
+ }
+
+ public void initGrouperForAggregate() throws HyracksDataException {
+ runs = new LinkedList<RunFileReader>();
+ gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories, firstNormalizerFactory,
+ aggregatorFactory, inRecDesc, outRecDesc, tableSize, framesLimit);
+ gTable.reset();
+ inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ }
+
+ public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int tupleCount = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ if (!gTable.insert(inFrameAccessor, i)) {
+ flushFramesToRun();
+ }
+ }
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ frameFlushRequest++;
+ long frameFlushTimer = System.currentTimeMillis();
+
+ if (!skipSort) {
+ long sortStart = System.currentTimeMillis();
+ gTable.sortFrames();
+ sortTimeCounter += System.currentTimeMillis() - sortStart;
+ }
+
+ if (!skipRun) {
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+ writer.open();
+ try {
+ gTable.flushFrames(writer, true);
+ } catch (Exception ex) {
+ throw new HyracksDataException(ex);
+ } finally {
+ writer.close();
+ ioCounter += writer.getFileSize();
+ }
+ runs.add(((RunFileWriter) writer).createReader());
+ }
+ gTable.reset();
+ frameFlushTime = System.currentTimeMillis() - frameFlushTimer;
+ }
+
+ public void finishAggregation() throws HyracksDataException {
+ gTable.finishup(isOutputSorted);
+ if (gTable.getFrameCount() >= 0 && runs.size() > 0) {
+ flushFramesToRun();
+ gTable.close();
+ gTable = null;
+ } else if (gTable.getFrameCount() >= 0 && runs.size() <= 0) {
+ // Write the output
+ gTable.flushFrames(finalWriter, false);
+ }
+ LOGGER.warning("[C]Hybrid AggregateActivity - RunIO " + ioCounter);
+ LOGGER.warning("[C]Hybrid AggregateActivity - FlushFrames " + frameFlushRequest);
+ LOGGER.warning("[T]Hybrid AggregateActivity - Sort " + sortTimeCounter);
+ LOGGER.warning("[T]Hybrid AggregateActivity - ProcessFrames " + frameProcessingTime);
+ LOGGER.warning("[T]Hybrid AggregateActivity - FlushFrames " + frameFlushTime);
+ LOGGER.warning("[T]Hybrid AggregateActivity " + (System.currentTimeMillis() - aggTimeStart));
+ }
+
+ public LinkedList<RunFileReader> getRunReaders() {
+ return runs;
+ }
+
+ public ISpillableTable getGroupTable() {
+ return gTable;
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java
new file mode 100644
index 0000000..5cdec97
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGrouperMerge.java
@@ -0,0 +1,461 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalHashGrouperMerge {
+
+ private static final Logger LOGGER = Logger.getLogger(ExternalHashGrouperMerge.class.getName());
+
+ private final int[] keyFields;
+ private final IBinaryComparator[] comparators;
+
+ private final IAggregatorDescriptor aggregator;
+ private final AggregateState aggregateState;
+
+ private final int framesLimit;
+
+ private final boolean isOutputSorted;
+
+ private final RecordDescriptor inRecDesc;
+
+ private final IHyracksTaskContext ctx;
+
+ private final ArrayTupleBuilder tupleBuilder;
+
+ private final IFrameWriter outputWriter;
+
+ List<ByteBuffer> inFrames;
+ ByteBuffer outFrame, writerFrame;
+ FrameTupleAppender outAppender, writerAppender;
+ LinkedList<RunFileReader> runs;
+ ArrayTupleBuilder finalTupleBuilder;
+ FrameTupleAccessor outFrameAccessor;
+ int[] currentFrameIndexInRun, currentRunFrames;
+ int runFrameLimit = 1;
+
+ // For instrumenting
+ long ioCounter;
+
+ public ExternalHashGrouperMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, boolean isOutputSorted, IFrameWriter outputWriter)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ this.framesLimit = framesLimit;
+
+ this.keyFields = keyFields;
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparators.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ aggregator = aggregatorFactory.createAggregator(ctx, inRecDesc, outRecDesc, keyFields, keyFields);
+ aggregateState = aggregator.createAggregateStates();
+
+ this.inRecDesc = inRecDesc;
+ this.isOutputSorted = isOutputSorted;
+
+ this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+ this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+ this.outputWriter = outputWriter;
+
+ this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ }
+
+ public void initialize(ISpillableTable gTable, LinkedList<RunFileReader> runFiles) throws HyracksDataException {
+ long mergeTimer = System.currentTimeMillis();
+ runs = runFiles;
+ outputWriter.open();
+ try {
+ if (runs.size() <= 0) {
+ if (gTable != null) {
+ if (isOutputSorted)
+ gTable.sortFrames();
+ gTable.flushFrames(outputWriter, false);
+ gTable.close();
+ gTable = null;
+ }
+ } else {
+ LOGGER.warning("[C]Hybrid MergeActivity - Runs " + runs.size());
+ runs = new LinkedList<RunFileReader>(runs);
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outAppender.reset(outFrame, true);
+ outFrameAccessor.reset(outFrame);
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ inFrames.clear();
+ }
+ } catch (Exception e) {
+ outputWriter.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ aggregateState.close();
+ LOGGER.warning("[C]Hybrid MergeActivity - RunIO " + ioCounter);
+ LOGGER.warning("[T]Hybrid MergeActivity " + (System.currentTimeMillis() - mergeTimer));
+ }
+ }
+
+ private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+ long mergePassTimer = System.currentTimeMillis();
+ long processedRecords = 0;
+ long groupCreated = 0;
+ FileReference newRun = null;
+ IFrameWriter writer = outputWriter;
+ boolean finalPass = false;
+
+ while (inFrames.size() + 2 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int runNumber;
+ if (runs.size() + 2 <= framesLimit) {
+ finalPass = true;
+ runFrameLimit = (framesLimit - 2) / runs.size();
+ runNumber = runs.size();
+ } else {
+ runNumber = framesLimit - 2;
+ newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalGroupOperatorDescriptor.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ currentFrameIndexInRun = new int[runNumber];
+ currentRunFrames = new int[runNumber];
+ /**
+ * Create file readers for each input run file, only for
+ * the ones fit into the inFrames
+ */
+ RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), inRecDesc, runNumber,
+ comparator);
+ /**
+ * current tuple index in each run
+ */
+ int[] tupleIndices = new int[runNumber];
+
+ for (int i = 0; i < runNumber; i++) {
+ int runIndex = topTuples.peek().getRunid();
+ tupleIndices[runIndex] = 0;
+ // Load the run file
+ runFileReaders[runIndex] = runs.get(runIndex);
+ runFileReaders[runIndex].open();
+
+ currentRunFrames[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ currentRunFrames[runIndex]++;
+ if (j == 0)
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Start merging
+ */
+ while (!topTuples.areRunsExhausted()) {
+ processedRecords++;
+ /**
+ * Get the top record
+ */
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+
+ FrameTupleAccessor fta = top.getAccessor();
+
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+
+ groupCreated++;
+
+ /**
+ * Initialize the first output record Reset the
+ * tuple builder
+ */
+
+ tupleBuilder.reset();
+
+ for (int k = 0; k < keyFields.length; k++) {
+ tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
+ }
+
+ aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
+
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ flushOutFrame(writer, finalPass);
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "The partial result is too large to be initialized in a frame.");
+ }
+ }
+
+ } else {
+ /**
+ * if new tuple is in the same group of the
+ * current aggregator do merge and output to the
+ * outFrame
+ */
+
+ aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+
+ }
+ tupleIndices[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ }
+
+ if (outAppender.getTupleCount() > 0) {
+ flushOutFrame(writer, finalPass);
+ outAppender.reset(outFrame, true);
+ }
+
+ aggregator.close();
+
+ runs.subList(0, runNumber).clear();
+ /**
+ * insert the new run file into the beginning of the run
+ * file list
+ */
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ ioCounter += ((RunFileWriter) writer).getFileSize();
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ LOGGER.warning("[TCC]Hybrid - Merge - doPass - " + (System.currentTimeMillis() - mergePassTimer) + " - "
+ + processedRecords + " - " + groupCreated);
+ }
+
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
+ }
+
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+
+ if (writerAppender == null) {
+ writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+ writerAppender.reset(writerFrame, true);
+
+ outFrameAccessor.reset(outFrame);
+
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+ finalTupleBuilder.reset();
+
+ for (int k = 0; k < keyFields.length; k++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
+ }
+
+ if (isFinal) {
+
+ aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+
+ } else {
+
+ aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+ }
+
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+ }
+ }
+ }
+ if (writerAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ }
+
+ outAppender.reset(outFrame, true);
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+ int runStart = runIndex * runFrameLimit;
+ boolean existNext = false;
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+ /**
+ * run already closed
+ */
+ existNext = false;
+ } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+ /**
+ * not the last frame for this run
+ */
+ existNext = true;
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex]++;
+ }
+ } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ /**
+ * the last frame has expired
+ */
+ existNext = true;
+ } else {
+ /**
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runStart;
+ /**
+ * read in batch
+ */
+ currentRunFrames[runIndex] = 0;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ existNext = true;
+ currentRunFrames[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (existNext) {
+ topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ int frameOffset = index * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ tupleAccessor[frameOffset + j] = null;
+ }
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+
+ @Override
+ public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+ int j1 = o1.getTupleIndex();
+ int j2 = o2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 87bc534..b59db51 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -109,5 +109,12 @@
int fieldCount, int fieldSlotLength, AggregateState state) throws HyracksDataException;
public void close();
+
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+ public int getFieldCount();
+
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+ AggregateState state) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index a34fa73..2c73136 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -116,5 +116,7 @@
* Close the field aggregator
*/
public void close();
+
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java
new file mode 100644
index 0000000..b952c40
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFrameAllocator.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameAllocator {
+
+ public int allocateFrameForOverflowing(int frameIndex) throws HyracksDataException;
+
+ public int allocateFrame() throws HyracksDataException;
+
+ public byte[] getFrame(int frameIndex) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java
new file mode 100644
index 0000000..39cd0e8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IImprovedHybridHashGroupFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IImprovedHybridHashGroupFactory extends Serializable {
+
+ public AbstractImprovedHybridHashGrouper createHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit,
+ int[] keys, int[] storedKeys, int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc,
+ RecordDescriptor outRecDesc, IBinaryComparator[] comparators,
+ ITuplePartitionComputer aggregateHashtablePartitionComputer,
+ ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException;
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java
new file mode 100644
index 0000000..d29ce05
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ImprovedHybridHashGroupFactory implements IImprovedHybridHashGroupFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.dataflow.std.group.IImprovedHybridHashGroupFactory#createHybridHashGrouper(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, int, int[], int[], int, int, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator[], edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer, edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer, edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory, edu.uci.ics.hyracks.api.comm.IFrameWriter)
+ */
+ @Override
+ public AbstractImprovedHybridHashGrouper createHybridHashGrouper(IHyracksTaskContext ctx, int framesLimit,
+ int[] keys, int[] storedKeys, int targetNumOfPartitions, int hashtableSize, RecordDescriptor inRecDesc,
+ RecordDescriptor outRecDesc, IBinaryComparator[] comparators,
+ ITuplePartitionComputer aggregateHashtablePartitionComputer,
+ ITuplePartitionComputer mergeHashtablePartitionComputer, IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory, IFrameWriter finalOutputWriter) throws HyracksDataException {
+ return new AbstractImprovedHybridHashGrouper(ctx, framesLimit, keys, storedKeys, targetNumOfPartitions,
+ hashtableSize, inRecDesc, outRecDesc, comparators, aggregateHashtablePartitionComputer,
+ mergeHashtablePartitionComputer, aggregatorFactory, mergerFactory, finalOutputWriter) {
+
+ @Override
+ public int getReservedFramesCount() {
+ // use one frame for output buffer
+ return 1;
+ }
+
+ /**
+ * Pick the resident partition with largest size (in frame).
+ */
+ @Override
+ public int selectPartitionToSpill() {
+ int pid = -1;
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (!partitionStatus.get(i)) {
+ // only pick resident partition
+ if (pid < 0) {
+ pid = i;
+ } else {
+ if (partitionSizeInFrame[i] > partitionSizeInFrame[pid])
+ pid = i;
+ }
+ }
+ }
+ return pid;
+ }
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..294366d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ImprovedHybridHashGroupOperatorDescriptor.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class ImprovedHybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private final int framesLimit;
+ private final int inputSize;
+ private final double factor;
+ private final int[] keys;
+ private final int hashtableSize;
+
+ private final IBinaryHashFunctionFamily[] hashFunctionFamilies;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ private final IImprovedHybridHashGroupFactory hybridHashGrouperFactory;
+
+ private final IAggregatorDescriptorFactory aggregatorFactory, mergerFactory;
+
+ private final static Logger LOGGER = Logger.getLogger(ImprovedHybridHashGroupOperatorDescriptor.class
+ .getSimpleName());
+
+ private static final double HYBRID_SWITCH_THRESHOLD = 0.8;
+
+ public ImprovedHybridHashGroupOperatorDescriptor(JobSpecification spec, int framesLimit, int inputSize,
+ double factor, int hashtableSize, int[] keys, IBinaryHashFunctionFamily[] hashFunctionFamilies,
+ IBinaryComparatorFactory[] comparatorFactories, IImprovedHybridHashGroupFactory hybridHashGrouperFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor outRecordDescriptor) {
+ super(spec, 1, 1);
+ this.inputSize = inputSize;
+ this.framesLimit = framesLimit;
+ this.factor = factor;
+ this.keys = keys;
+ this.hashtableSize = hashtableSize;
+
+ this.hashFunctionFamilies = hashFunctionFamilies;
+ this.comparatorFactories = comparatorFactories;
+
+ this.hybridHashGrouperFactory = hybridHashGrouperFactory;
+
+ this.aggregatorFactory = aggregatorFactory;
+ this.mergerFactory = mergerFactory;
+
+ recordDescriptors[0] = outRecordDescriptor;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ /* (non-Javadoc)
+ * @see edu.uci.ics.hyracks.api.dataflow.IActivity#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+ */
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+
+ final ITuplePartitionComputer aggregateTpc = new FieldHashPartitionComputerFamily(keys, hashFunctionFamilies)
+ .createPartitioner(0);
+
+ // Keys for stored records
+ final int[] storedKeys = new int[keys.length];
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keys.length];
+ for (int i = 0; i < keys.length; i++) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecDesc.getFields()[keys[i]];
+ }
+
+ final ITuplePartitionComputer mergeTpc = new FieldHashPartitionComputerFamily(storedKeys, hashFunctionFamilies)
+ .createPartitioner(0);
+
+ final RecordDescriptor internalRecordDescriptor;
+
+ if (keys.length >= recordDescriptors[0].getFields().length) {
+ // for the case of zero-aggregations
+ ISerializerDeserializer<?>[] fields = recordDescriptors[0].getFields();
+ ITypeTraits[] types = recordDescriptors[0].getTypeTraits();
+ ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+ for (int i = 0; i < fields.length; i++)
+ newFields[i] = fields[i];
+ ITypeTraits[] newTypes = null;
+ if (types != null) {
+ newTypes = new ITypeTraits[types.length + 1];
+ for (int i = 0; i < types.length; i++)
+ newTypes[i] = types[i];
+ }
+ internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+ } else {
+ internalRecordDescriptor = recordDescriptors[0];
+ }
+
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ private AbstractImprovedHybridHashGrouper hybridHashGrouper;
+
+ private int numOfPartitions;
+
+ private ByteBuffer runLoadFrame;
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ hybridHashGrouper = hybridHashGrouperFactory.createHybridHashGrouper(ctx, framesLimit, keys,
+ storedKeys, getNumberOfPartitions(framesLimit, inputSize, factor), hashtableSize, inRecDesc,
+ recordDescriptors[0], comparators, aggregateTpc, mergeTpc, aggregatorFactory, mergerFactory,
+ writer);
+ numOfPartitions = hybridHashGrouper.getNumOfPartitions();
+ }
+
+ private int getNumberOfPartitions(int framesLimit, int inputSize, double factor) {
+ int numberOfPartitions = (int) (Math.ceil((double) (inputSize * factor / nPartitions - framesLimit)
+ / (double) (framesLimit - 1)));
+ if (numberOfPartitions <= 0) {
+ numberOfPartitions = 1;
+ }
+ return numberOfPartitions;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ hybridHashGrouper.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ hybridHashGrouper.close();
+ // process run files
+ for (int i = hybridHashGrouper.partitionStatus.nextSetBit(0); i >= 0; i = hybridHashGrouper.partitionStatus
+ .nextSetBit(i + 1)) {
+ if (hybridHashGrouper.getSpilledRunWriter(i) == null) {
+ continue;
+ }
+ RunFileReader runReader = hybridHashGrouper.getSpilledRunWriter(i).createReader();
+ int runFileSizeInFrame = hybridHashGrouper.getPartitionSizeInFrame(i);
+ processRunFile(runReader, runFileSizeInFrame, hashtableSize / numOfPartitions, 0);
+ }
+ writer.close();
+ }
+
+ private void processRunFile(RunFileReader runReader, int runFileSize, int htSize, int level)
+ throws HyracksDataException {
+ LOGGER.warning("processRunFile " + runFileSize + " " + htSize);
+ int partitionCount = getNumberOfPartitions(framesLimit, runFileSize, factor);
+ ITuplePartitionComputer partitionComputer = new FieldHashPartitionComputerFamily(storedKeys,
+ hashFunctionFamilies).createPartitioner(level + 1);
+ AbstractImprovedHybridHashGrouper grouper = hybridHashGrouperFactory.createHybridHashGrouper(ctx,
+ framesLimit, storedKeys, storedKeys, partitionCount, hashtableSize, internalRecordDescriptor,
+ recordDescriptors[0], comparators, partitionComputer, partitionComputer, mergerFactory,
+ mergerFactory, writer);
+
+ runReader.open();
+
+ if (runLoadFrame == null) {
+ runLoadFrame = ctx.allocateFrame();
+ }
+
+ while (runReader.nextFrame(runLoadFrame)) {
+ grouper.nextFrame(runLoadFrame);
+ }
+
+ runReader.close();
+ grouper.close();
+
+ if (grouper.hasSpilledPartition()) {
+ if (grouper.getPartitionMaxFrameSize() < HYBRID_SWITCH_THRESHOLD * runFileSize) {
+ // recursive hybrid-hash-group
+ for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+ .nextSetBit(i + 1)) {
+ RunFileReader runFileReader = grouper.getSpilledRunWriter(i).createReader();
+ int runFileSizeInFrame = grouper.getPartitionSizeInFrame(i);
+ processRunFile(runFileReader, runFileSizeInFrame, htSize / partitionCount, level + 1);
+ }
+ } else {
+ // switch to external-hash-group
+ int tableSize = 0;
+
+ for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+ .nextSetBit(i + 1)) {
+ tableSize += grouper.partitionSizeInTuple[i] * 2;
+ }
+
+ // use external-hash-group
+ ExternalHashGrouperAggregate aggregator = new ExternalHashGrouperAggregate(ctx, storedKeys,
+ framesLimit, tableSize, comparatorFactories, null, mergerFactory,
+ internalRecordDescriptor, recordDescriptors[0], new HashSpillableFrameSortTableFactory(
+ new ITuplePartitionComputerFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new FieldHashPartitionComputerFamily(storedKeys,
+ hashFunctionFamilies).createPartitioner(0);
+ }
+ }), writer, false);
+ aggregator.initGrouperForAggregate();
+
+ for (int i = grouper.partitionStatus.nextSetBit(0); i >= 0; i = grouper.partitionStatus
+ .nextSetBit(i + 1)) {
+ RunFileReader rReader = grouper.getSpilledRunWriter(i).createReader();
+ rReader.open();
+ while (rReader.nextFrame(runLoadFrame)) {
+ aggregator.insertFrame(runLoadFrame);
+ }
+ rReader.close();
+ }
+ aggregator.finishAggregation();
+ LinkedList<RunFileReader> runFiles = aggregator.getRunReaders();
+ if (!runFiles.isEmpty()) {
+ ExternalHashGrouperMerge merger = new ExternalHashGrouperMerge(ctx, storedKeys,
+ framesLimit, comparatorFactories, mergerFactory, internalRecordDescriptor,
+ recordDescriptors[0], false, writer);
+ merger.initialize(aggregator.getGroupTable(), runFiles);
+ }
+ }
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java
new file mode 100644
index 0000000..b9ff385
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/SerializableHashTableWithFrameAllocator.java
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class SerializableHashTableWithFrameAllocator {
+
+ private static final int INT_SIZE = 4;
+ private static final int INIT_ENTRY_SIZE = 4;
+
+ private IntSerDeBuffer[] headers;
+ private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
+
+ private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
+ private final IFrameAllocator frameAllocator;
+
+ private int frameCapacity = 0;
+ private int currentLargestFrameIndex = 0;
+ private int tupleCount = 0;
+ private int headerFrameCount = 0;
+ private TuplePointer tempTuplePointer = new TuplePointer();
+
+ private int beginFrameIndex, lastFrameIndex;
+
+ public SerializableHashTableWithFrameAllocator(int tableSize, int frameSize, IFrameAllocator frameAllocator)
+ throws HyracksDataException {
+ this.frameAllocator = frameAllocator;
+
+ int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+ int headerSize = tableSize / frameSize * INT_SIZE * 2 + residual;
+ headers = new IntSerDeBuffer[headerSize];
+
+ this.beginFrameIndex = frameAllocator.allocateFrame();
+ this.lastFrameIndex = beginFrameIndex;
+ byte[] headerFrame = frameAllocator.getFrame(beginFrameIndex);
+ if (headerFrame == null) {
+ throw new HyracksDataException("Cannot initialize the hashtable: not enough memory");
+ }
+
+ IntSerDeBuffer frame = new IntSerDeBuffer(headerFrame);
+ resetFrame(frame);
+ contents.add(frame);
+ frameCurrentIndex.add(0);
+ frameCapacity = frame.capacity();
+ }
+
+ public boolean insert(int entry, TuplePointer pointer) throws HyracksDataException {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header == null) {
+ int newFrameIndex = frameAllocator.allocateFrameForOverflowing(lastFrameIndex);
+ if (newFrameIndex < 0) {
+ return false;
+ }
+ this.lastFrameIndex = newFrameIndex;
+ byte[] newFrame = frameAllocator.getFrame(lastFrameIndex);
+ if (newFrame == null) {
+ return false;
+ }
+ header = new IntSerDeBuffer(newFrame);
+ headers[hFrameIndex] = header;
+ resetFrame(header);
+ headerFrameCount++;
+ }
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ boolean isInserted;
+ if (frameIndex < 0) {
+ // insert first tuple into the entry
+ isInserted = insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
+ } else {
+ // insert non-first tuple into the entry
+ isInserted = insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
+ }
+ if (isInserted) {
+ tupleCount++;
+ }
+ return isInserted;
+ }
+
+ public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+ int hFrameIndex = getHeaderFrameIndex(entry);
+ int headerOffset = getHeaderFrameOffset(entry);
+ IntSerDeBuffer header = headers[hFrameIndex];
+ if (header == null) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ int frameIndex = header.getInt(headerOffset);
+ int offsetIndex = header.getInt(headerOffset + 1);
+ if (frameIndex < 0) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+ if (offset > entryUsedItems - 1) {
+ dataPointer.frameIndex = -1;
+ dataPointer.tupleIndex = -1;
+ return;
+ }
+ int startIndex = offsetIndex + 2 + offset * 2;
+ while (startIndex >= frameCapacity) {
+ ++frameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(frameIndex);
+ dataPointer.frameIndex = frame.getInt(startIndex);
+ dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+ }
+
+ public void reset() {
+ for (IntSerDeBuffer frame : headers)
+ if (frame != null)
+ resetFrame(frame);
+
+ frameCurrentIndex.clear();
+ for (int i = 0; i < contents.size(); i++) {
+ frameCurrentIndex.add(0);
+ }
+
+ currentLargestFrameIndex = 0;
+ tupleCount = 0;
+ }
+
+ public int getFrameCount() {
+ return headerFrameCount + contents.size();
+ }
+
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ public void close() {
+ headers = null;
+ contents.clear();
+ frameCurrentIndex.clear();
+ tupleCount = 0;
+ currentLargestFrameIndex = 0;
+ lastFrameIndex = -1;
+ beginFrameIndex = -1;
+ }
+
+ private boolean insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
+ throws HyracksDataException {
+ IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
+ int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
+ int requiredIntCapacity = entryCapacity * 2;
+ int startFrameIndex = currentLargestFrameIndex;
+
+ if (lastIndex + requiredIntCapacity >= frameCapacity) {
+ IntSerDeBuffer newFrame;
+ startFrameIndex++;
+ do {
+ if (currentLargestFrameIndex >= contents.size() - 1) {
+ int newFrameIndex = frameAllocator.allocateFrameForOverflowing(lastFrameIndex);
+ this.lastFrameIndex = newFrameIndex;
+ byte[] allocatedFrame = frameAllocator.getFrame(lastFrameIndex);
+ if (allocatedFrame == null) {
+ return false;
+ }
+ newFrame = new IntSerDeBuffer(allocatedFrame);
+ currentLargestFrameIndex++;
+ contents.add(newFrame);
+ frameCurrentIndex.add(0);
+ } else {
+ currentLargestFrameIndex++;
+ frameCurrentIndex.set(currentLargestFrameIndex, 0);
+ }
+ requiredIntCapacity -= frameCapacity;
+ } while (requiredIntCapacity > 0);
+ lastIndex = 0;
+ lastFrame = contents.get(startFrameIndex);
+ }
+
+ // set header
+ header.writeInt(headerOffset, startFrameIndex);
+ header.writeInt(headerOffset + 1, lastIndex);
+
+ // set the entry
+ lastFrame.writeInt(lastIndex, entryCapacity - 1);
+ lastFrame.writeInt(lastIndex + 1, 1);
+ lastFrame.writeInt(lastIndex + 2, pointer.frameIndex);
+ lastFrame.writeInt(lastIndex + 3, pointer.tupleIndex);
+ int newLastIndex = lastIndex + entryCapacity * 2;
+ newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
+ frameCurrentIndex.set(startFrameIndex, newLastIndex);
+
+ requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
+ while (requiredIntCapacity > 0) {
+ startFrameIndex++;
+ requiredIntCapacity -= frameCapacity;
+ newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
+ frameCurrentIndex.set(startFrameIndex, newLastIndex);
+ }
+ return true;
+ }
+
+ private boolean insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndex, int offsetIndex,
+ TuplePointer pointer) throws HyracksDataException {
+ IntSerDeBuffer frame = contents.get(frameIndex);
+ int entryItems = frame.getInt(offsetIndex);
+ int entryUsedItems = frame.getInt(offsetIndex + 1);
+
+ if (entryUsedItems < entryItems) {
+ frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
+ int startIndex = offsetIndex + 2 + entryUsedItems * 2;
+ while (startIndex >= frameCapacity) {
+ ++frameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(frameIndex);
+ frame.writeInt(startIndex, pointer.frameIndex);
+ frame.writeInt(startIndex + 1, pointer.tupleIndex);
+ } else {
+ int capacity = (entryItems + 1) * 2;
+ int headFrameVal = header.getInt(headerOffset);
+ int headOffsetVal = header.getInt(headerOffset + 1);
+ header.writeInt(headerOffset, -1);
+ header.writeInt(headerOffset + 1, -1);
+ int fIndex = frame.getInt(offsetIndex + 2);
+ int tIndex = frame.getInt(offsetIndex + 3);
+ tempTuplePointer.frameIndex = fIndex;
+ tempTuplePointer.tupleIndex = tIndex;
+ if (!insertNewEntry(header, headerOffset, capacity, tempTuplePointer)) {
+ header.writeInt(headerOffset, headFrameVal);
+ header.writeInt(headerOffset + 1, headOffsetVal);
+ return false;
+ }
+
+ int newFrameIndex = header.getInt(headerOffset);
+ int newTupleIndex = header.getInt(headerOffset + 1);
+
+ for (int i = 1; i < entryUsedItems; i++) {
+ int startIndex = offsetIndex + 2 + i * 2;
+ int startFrameIndex = frameIndex;
+ while (startIndex >= frameCapacity) {
+ ++startFrameIndex;
+ startIndex -= frameCapacity;
+ }
+ frame = contents.get(startFrameIndex);
+ fIndex = frame.getInt(startIndex);
+ tIndex = frame.getInt(startIndex + 1);
+ tempTuplePointer.frameIndex = fIndex;
+ tempTuplePointer.tupleIndex = tIndex;
+ if (!insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer)) {
+ throw new HyracksDataException("Failed to allocate space to expand the entry size.");
+ }
+ }
+ if (!insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer)) {
+ throw new HyracksDataException("Failed to insert the tuple after the entry has been expanded.");
+ }
+ }
+ return true;
+ }
+
+ private void resetFrame(IntSerDeBuffer frame) {
+ for (int i = 0; i < frameCapacity; i++)
+ frame.writeInt(i, -1);
+ }
+
+ private int getHeaderFrameIndex(int entry) {
+ int frameIndex = entry * 2 / frameCapacity;
+ return frameIndex;
+ }
+
+ private int getHeaderFrameOffset(int entry) {
+ int offset = entry * 2 % frameCapacity;
+ return offset;
+ }
+
+ public int getLastFrame() {
+ return lastFrameIndex;
+ }
+
+ public static int getInitialFrameCount() {
+ return 1;
+ }
+
+ public static int getMaxHeaderFrameCount(int hashtableSize, int frameSize) {
+ return (int) (Math.ceil(hashtableSize * 2 / (frameSize / 4)));
+ }
+
+ private class IntSerDeBuffer {
+
+ private byte[] bytes;
+
+ public IntSerDeBuffer(byte[] data) {
+ this.bytes = data;
+ }
+
+ public int getInt(int pos) {
+ int offset = pos * 4;
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+ + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+ }
+
+ public void writeInt(int pos, int value) {
+ int offset = pos * 4;
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public int capacity() {
+ return bytes.length / 4;
+ }
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java
new file mode 100644
index 0000000..4826c47
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/TupleInFrameAccessor.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class TupleInFrameAccessor {
+
+ private final RecordDescriptor recordDescriptor;
+
+ private ByteBuffer buffer;
+
+ private int tupleOffset;
+
+ public TupleInFrameAccessor(RecordDescriptor recordDescriptor) {
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ public void reset(ByteBuffer buffer, int tupleOffset) {
+ this.buffer = buffer;
+ this.tupleOffset = tupleOffset;
+ }
+
+ public int getTupleStartOffset() {
+ return tupleOffset;
+ }
+
+ public int getTupleEndOffset() {
+ return tupleOffset + getFieldCount() * 4 + buffer.getInt(tupleOffset + (getFieldCount() - 1) * 4);
+ }
+
+ public int getFieldStartOffset(int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset() + (fIdx - 1) * 4);
+ }
+
+ public int getFieldEndOffset(int fIdx) {
+ return buffer.getInt(getTupleStartOffset() + fIdx * 4);
+ }
+
+ public int getFieldLength(int fIdx) {
+ return getFieldEndOffset(fIdx) - getFieldStartOffset(fIdx);
+ }
+
+ public int getFieldSlotsLength() {
+ return getFieldCount() * 4;
+ }
+
+ public int getFieldCount() {
+ return recordDescriptor.getFieldCount();
+ }
+
+ public ByteBuffer getBuffer(){
+ return buffer;
+ }
+
+ public int getTupleLength(){
+ return getTupleEndOffset() - tupleOffset;
+ }
+
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 2e781b5..760ca5b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -167,6 +167,14 @@
return new AggregateState(new Integer[] { 0, 0 });
}
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (useObjectState) {
+ return 0;
+ } else {
+ return 8;
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index cc5c1e1..53e62ee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -167,6 +167,14 @@
state.state = new Integer[] { sum, count };
}
}
+
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (useObjectState) {
+ return 0;
+ } else {
+ return 8;
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index 9bfec8e..12f4325 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -135,6 +135,15 @@
state.state = count;
}
}
+
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (useObjectState) {
+ return 0;
+ } else {
+ return 4;
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
index f8b1d74..851d874 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/FloatSumFieldAggregatorFactory.java
@@ -30,37 +30,35 @@
/**
*
*/
-public class FloatSumFieldAggregatorFactory implements
- IFieldAggregateDescriptorFactory {
+public class FloatSumFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
private static final long serialVersionUID = 1L;
private final int aggField;
private final boolean useObjectState;
-
- public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState){
+
+ public FloatSumFieldAggregatorFactory(int aggField, boolean useObjState) {
this.aggField = aggField;
this.useObjectState = useObjState;
}
-
+
/* (non-Javadoc)
* @see edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
- public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
- RecordDescriptor inRecordDescriptor,
+ public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
return new IFieldAggregateDescriptor() {
-
+
@Override
public void reset() {
-
+
}
-
+
@Override
- public void outputPartialResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
float sum;
if (!useObjectState) {
sum = FloatSerializerDeserializer.getFloat(data, offset);
@@ -73,10 +71,10 @@
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
- public void outputFinalResult(DataOutput fieldOutput, byte[] data,
- int offset, AggregateState state) throws HyracksDataException {
+ public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+ throws HyracksDataException {
float sum;
if (!useObjectState) {
sum = FloatSerializerDeserializer.getFloat(data, offset);
@@ -89,20 +87,19 @@
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
}
-
+
@Override
public boolean needsObjectState() {
return useObjectState;
}
-
+
@Override
public boolean needsBinaryState() {
return !useObjectState;
}
-
+
@Override
- public void init(IFrameTupleAccessor accessor, int tIndex,
- DataOutput fieldOutput, AggregateState state)
+ public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
throws HyracksDataException {
float sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -121,22 +118,21 @@
state.state = sum;
}
}
-
+
@Override
public AggregateState createState() {
return new AggregateState(new Float(0.0));
}
-
+
@Override
public void close() {
// TODO Auto-generated method stub
-
+
}
-
+
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- byte[] data, int offset, AggregateState state)
- throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+ AggregateState state) throws HyracksDataException {
float sum = 0;
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
@@ -152,6 +148,15 @@
state.state = sum;
}
}
+
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (useObjectState) {
+ return 0;
+ } else {
+ return 4;
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index 7d85deb..d5f8637 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -151,6 +151,15 @@
state.state = sum;
}
}
+
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (useObjectState) {
+ return 0;
+ } else {
+ return 4;
+ }
+ }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 94ebcbd..b8df344 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -194,6 +194,15 @@
return new AggregateState();
}
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) {
+ if (hasBinaryState) {
+ return 4;
+ } else {
+ return 0;
+ }
+ }
+
};
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 394a5e3..8d06708 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.TupleInFrameAccessor;
/**
*
@@ -203,6 +204,43 @@
}
}
}
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, TupleInFrameAccessor stateAccessor,
+ AggregateState state) throws HyracksDataException {
+ if (stateAccessor != null) {
+ int stateTupleOffset = stateAccessor.getTupleStartOffset();
+ int fieldIndex = 0;
+ for (int i = 0; i < aggregators.length; i++) {
+ if (aggregators[i].needsBinaryState()) {
+ int stateFieldOffset = stateAccessor.getFieldStartOffset(keys.length + fieldIndex);
+ aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
+ stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
+ ((AggregateState[]) state.state)[i]);
+ fieldIndex++;
+ } else {
+ aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
+ }
+ }
+ } else {
+ for (int i = 0; i < aggregators.length; i++) {
+ aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
+ }
+ }
+ }
+
+ @Override
+ public int getInitSize(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int initLength = 0;
+ for (int i = 0; i < aggregators.length; i++)
+ initLength += aggregators[i].getInitSize(accessor, tIndex);
+ return initLength;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return aggregators.length;
+ }
};
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java
new file mode 100644
index 0000000..544d8a7
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ImprovedHybridHashGroupTest.java
@@ -0,0 +1,424 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.ImprovedHybridHashGroupFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.ImprovedHybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+
+public class ImprovedHybridHashGroupTest extends AbstractIntegrationTest {
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+ new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+ final int inputSize = 2300;
+
+ final int frameLimits = 6;
+
+ final double factor = 1.2;
+
+ final int hashtableSize = 12000;
+
+ final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
+
+ private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
+ throws IOException {
+
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+ new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
+ new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
+
+ return printer;
+ }
+
+ @Test
+ public void singleKeySumGroupOneNodeTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits, inputSize, factor, hashtableSize, keyFields,
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID);
+
+ IConnectorDescriptor conn1 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumGroupOneNodeTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeySumGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits, inputSize, factor, hashtableSize, keyFields,
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits, inputSize, factor, hashtableSize, keyFields,
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(2, false),
+ new AvgFieldMergeAggregatorFactory(3, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits, inputSize, factor, hashtableSize, keyFields,
+ new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void multiKeySumExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 8, 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits * 2, 3 * inputSize, factor, hashtableSize * 3, keyFields, new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void multiKeyAvgExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 8, 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits * 2, 3 * inputSize, factor, 3 * hashtableSize, keyFields, new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+ new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+ new IntSumFieldAggregatorFactory(3, false),
+ new AvgFieldMergeAggregatorFactory(4, false) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void multiKeyMinMaxStringExtGroupTest() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 8, 0 };
+
+ ImprovedHybridHashGroupOperatorDescriptor grouper = new ImprovedHybridHashGroupOperatorDescriptor(spec,
+ frameLimits * 2, 3 * inputSize, factor, 3 * hashtableSize, keyFields, new IBinaryHashFunctionFamily[] {
+ UTF8StringBinaryHashFunctionFamily.INSTANCE, UTF8StringBinaryHashFunctionFamily.INSTANCE },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new ImprovedHybridHashGroupFactory(), new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+ new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(2, false),
+ new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+}