Merge HybridHash from Jarod branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2689 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
new file mode 100644
index 0000000..d63609e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
@@ -0,0 +1,377 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class GroupRunMergingFrameReader implements IFrameReader {
+
+ private static final int INT_SIZE = 4;
+
+ private final IHyracksTaskContext ctx;
+ private final IFrameReader[] runCursors;
+ private final List<ByteBuffer> inFrames;
+ private final int[] keyFields;
+ private final int framesLimit;
+ private final int tableSize;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final FrameTupleAppender outFrameAppender;
+ private final ITuplePartitionComputer tpc;
+ private ReferencedPriorityQueue topTuples;
+ private int[] tupleIndexes;
+ private int[] currentFrameIndexForRuns, bufferedFramesForRuns;
+ private FrameTupleAccessor[] tupleAccessors;
+ private int framesBuffered;
+
+ private final IAggregatorDescriptor grouper;
+ private final AggregateState groupState;
+
+ private final boolean isLoadBuffered;
+
+ private final boolean isFinalPhase;
+
+ private final ArrayTupleBuilder groupTupleBuilder, outputTupleBuilder;
+
+ private byte[] groupResultCache;
+ private ByteBuffer groupResultCacheBuffer;
+ private IFrameTupleAccessor groupResultCacheAccessor;
+ private FrameTupleAppender groupResultCacheAppender;
+
+ // FIXME
+ long queueCompCounter = 0, mergeCompCounter = 0;
+
+ public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
+ int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
+ IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase) {
+ this(ctx, runCursors, framesLimit, tableSize, keyFields, tpc, comparators, grouper, recordDesc, isFinalPhase,
+ false);
+ }
+
+ public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
+ int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
+ IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase, boolean isLoadBuffered) {
+ this.ctx = ctx;
+ this.runCursors = runCursors;
+ this.inFrames = new ArrayList<ByteBuffer>();
+ this.keyFields = keyFields;
+ this.tableSize = tableSize;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ this.grouper = grouper;
+ this.groupState = grouper.createAggregateStates();
+ this.outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ this.isLoadBuffered = isLoadBuffered;
+ this.isFinalPhase = isFinalPhase;
+ this.framesLimit = framesLimit;
+ this.tpc = tpc;
+
+ this.groupTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
+ this.outputTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameReader#open()
+ */
+ @Override
+ public void open() throws HyracksDataException {
+ if (isLoadBuffered) {
+ while (inFrames.size() + 1 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ framesBuffered = inFrames.size() / runCursors.length;
+ } else {
+ while (inFrames.size() < framesLimit - 1 && inFrames.size() < runCursors.length) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ framesBuffered = 1;
+ }
+ tupleAccessors = new FrameTupleAccessor[runCursors.length];
+ currentFrameIndexForRuns = new int[runCursors.length];
+ bufferedFramesForRuns = new int[runCursors.length];
+ Comparator<ReferenceEntryWithBucketID> comparator = createEntryComparator(comparators);
+ topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
+ tupleIndexes = new int[runCursors.length];
+
+ for (int i = 0; i < runCursors.length; i++) {
+ int runIndex = topTuples.peek().getRunid();
+ tupleIndexes[runIndex] = 0;
+ runCursors[runIndex].open();
+ for (int j = 0; j < framesBuffered; j++) {
+
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
+
+ bufferedFramesForRuns[runIndex]++;
+ if (j == 0) {
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex * framesBuffered + j));
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameReader#nextFrame(java.nio.ByteBuffer)
+ */
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ outFrameAppender.reset(buffer, true);
+
+ while (!topTuples.areRunsExhausted()) {
+ ReferenceEntryWithBucketID top = topTuples.peek();
+ int runIndex = top.getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ int tupleIndex = top.getTupleIndex();
+
+ // check whether we can do aggregation
+ boolean needInsert = true;
+ if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+ groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
+ if (compareFrameTuples(fta, tupleIndex, groupResultCacheAccessor, 0) == 0) {
+ needInsert = false;
+ }
+ }
+
+ if (needInsert) {
+
+ // try to flush the group cache into the output buffer, if any
+ if (groupResultCacheAccessor != null && groupResultCacheAccessor.getFieldCount() > 0) {
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
+ }
+ if (isFinalPhase) {
+ grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+ } else {
+ grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+ }
+
+ // return if the buffer is full
+ if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ return true;
+ }
+ groupResultCacheBuffer.putInt(groupResultCache.length - 4, 0);
+ }
+
+ groupTupleBuilder.reset();
+ for (int k : keyFields) {
+ groupTupleBuilder.addField(fta, tupleIndex, k);
+ }
+ grouper.init(groupTupleBuilder, fta, tupleIndex, groupState);
+
+ // enlarge the cache buffer if necessary
+ int requiredSize = groupTupleBuilder.getSize() + groupTupleBuilder.getFieldEndOffsets().length
+ * INT_SIZE + 2 * INT_SIZE;
+
+ if (groupResultCache == null || groupResultCache.length < requiredSize) {
+ groupResultCache = new byte[requiredSize];
+ groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
+ groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
+ groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, recordDesc);
+ }
+
+ // always reset the group cache
+ groupResultCacheAppender.reset(groupResultCacheBuffer, true);
+ if (!groupResultCacheAppender.append(groupTupleBuilder.getFieldEndOffsets(),
+ groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
+ throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
+ }
+
+ groupResultCacheAccessor.reset(groupResultCacheBuffer);
+
+ } else {
+ grouper.aggregate(fta, tupleIndex, groupResultCacheAccessor, 0, groupState);
+ }
+
+ ++tupleIndexes[runIndex];
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ }
+
+ if (groupResultCacheAccessor != null && groupResultCacheAccessor.getTupleCount() > 0) {
+ outputTupleBuilder.reset();
+ for (int k = 0; k < keyFields.length; k++) {
+ outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
+ }
+ if (isFinalPhase) {
+ grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+ } else {
+ grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
+ }
+
+ // return if the buffer is full
+ if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
+ outputTupleBuilder.getSize())) {
+ return true;
+ }
+
+ groupResultCacheAccessor = null;
+ groupResultCache = null;
+ groupResultCacheBuffer = null;
+ groupResultCacheAppender = null;
+ }
+
+ if (outFrameAppender.getTupleCount() > 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameReader#close()
+ */
+ @Override
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < runCursors.length; ++i) {
+ closeRun(i, runCursors, tupleAccessors);
+ }
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ if (exists) {
+ int h = tpc.partition(tupleAccessors[runIndex], tupleIndexes[runIndex], tableSize);
+ topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex], h);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+ if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ return false;
+ } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+ if (currentFrameIndexForRuns[runIndex] - runIndex * framesBuffered < bufferedFramesForRuns[runIndex] - 1) {
+ currentFrameIndexForRuns[runIndex]++;
+ } else {
+ bufferedFramesForRuns[runIndex] = 0;
+ for (int j = 0; j < framesBuffered; j++) {
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
+ bufferedFramesForRuns[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
+ }
+ if (bufferedFramesForRuns[runIndex] > 0) {
+ tupleAccessors[runIndex].reset(inFrames.get(currentFrameIndexForRuns[runIndex]));
+ tupleIndexes[runIndex] = 0;
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ tupleAccessors[index] = null;
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ mergeCompCounter++;
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private Comparator<ReferenceEntryWithBucketID> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntryWithBucketID>() {
+ public int compare(ReferenceEntryWithBucketID tp1, ReferenceEntryWithBucketID tp2) {
+
+ queueCompCounter++;
+
+ int cmp = tp1.getBucketID() - tp2.getBucketID();
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ int j1 = tp1.getTupleIndex();
+ int j2 = tp2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = keyFields[f];
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+
+ return cmp;
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
new file mode 100644
index 0000000..57a364b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
@@ -0,0 +1,687 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAccessorForGroupHashtable;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAppenderForGroupHashtable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HybridHashSortGroupHashTable {
+
+ protected static final int INT_SIZE = 4;
+ protected static final int INIT_REF_COUNT = 8;
+ protected static final int PTR_SIZE = 3;
+
+ protected final int tableSize, framesLimit, frameSize;
+
+ protected final ByteBuffer[] headers;
+ protected final ByteBuffer[] contents;
+
+ protected final IHyracksTaskContext ctx;
+
+ protected int currentLargestFrameIndex;
+ protected int totalTupleCount;
+
+ protected final IAggregatorDescriptor aggregator;
+ protected final AggregateState aggState;
+
+ protected final int[] keys, internalKeys;
+
+ private final IBinaryComparator[] comparators;
+
+ protected final ITuplePartitionComputer tpc;
+
+ protected final INormalizedKeyComputer firstNormalizer;
+
+ private ByteBuffer outputBuffer;
+
+ private LinkedList<RunFileReader> runReaders;
+
+ protected TuplePointer matchPointer;
+
+ protected final FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
+
+ private final FrameTupleAccessorForGroupHashtable compFrameAccessor1, compFrameAccessor2;
+
+ protected final FrameTupleAppenderForGroupHashtable internalAppender;
+
+ private final FrameTupleAppender outputAppender;
+
+ /**
+ * Tuple builder for hash table insertion
+ */
+ protected final ArrayTupleBuilder internalTupleBuilder, outputTupleBuilder;
+
+ /**
+ * pointers for sort records in an entry
+ */
+ protected int[] tPointers;
+
+ protected int usedEntries = 0;
+
+ protected long hashedKeys = 0, hashedRawRec = 0;
+
+ public HybridHashSortGroupHashTable(IHyracksTaskContext ctx, int frameLimits, int tableSize, int[] keys,
+ IBinaryComparator[] comparators, ITuplePartitionComputer tpc,
+ INormalizedKeyComputer firstNormalizerComputer, IAggregatorDescriptor aggregator,
+ RecordDescriptor inRecDesc, RecordDescriptor outRecDesc) {
+ this.ctx = ctx;
+ this.tableSize = tableSize;
+ this.framesLimit = frameLimits;
+ this.frameSize = ctx.getFrameSize();
+
+ this.keys = keys;
+ this.internalKeys = new int[keys.length];
+ for (int i = 0; i < internalKeys.length; i++) {
+ internalKeys[i] = i;
+ }
+
+ this.aggregator = aggregator;
+ this.aggState = aggregator.createAggregateStates();
+
+ this.tpc = tpc;
+ this.comparators = comparators;
+ this.firstNormalizer = firstNormalizerComputer;
+
+ // initialize the hash table
+ int residual = ((tableSize % frameSize) * INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
+ this.headers = new ByteBuffer[tableSize / frameSize * INT_SIZE * 2 + tableSize % frameSize * 2 * INT_SIZE
+ / frameSize + residual];
+
+ this.outputBuffer = ctx.allocateFrame();
+
+ this.contents = new ByteBuffer[framesLimit - 1 - headers.length];
+ this.currentLargestFrameIndex = -1;
+ this.totalTupleCount = 0;
+
+ this.runReaders = new LinkedList<RunFileReader>();
+ this.hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+ this.compFrameAccessor1 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+ this.compFrameAccessor2 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
+
+ this.internalTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
+ this.outputTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
+ this.internalAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
+ this.outputAppender = new FrameTupleAppender(frameSize);
+
+ this.matchPointer = new TuplePointer();
+
+ }
+
+ /**
+ * Reset the header page
+ *
+ * @param headerFrameIndex
+ */
+ protected void resetHeader(int headerFrameIndex) {
+ for (int i = 0; i < frameSize; i += INT_SIZE) {
+ headers[headerFrameIndex].putInt(i, -1);
+ }
+ }
+
+ /**
+ * Get the header frame index of the given hash table entry
+ *
+ * @param entry
+ * @return
+ */
+ protected int getHeaderFrameIndex(int entry) {
+ int frameIndex = entry / frameSize * 2 * INT_SIZE + entry % frameSize * 2 * INT_SIZE / frameSize;
+ return frameIndex;
+ }
+
+ /**
+ * Get the tuple index of the given hash table entry
+ *
+ * @param entry
+ * @return
+ */
+ protected int getHeaderTupleIndex(int entry) {
+ int offset = entry % frameSize * 2 * INT_SIZE % frameSize;
+ return offset;
+ }
+
+ public void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
+
+ int entry = tpc.partition(accessor, tupleIndex, tableSize);
+
+ hashedRawRec++;
+
+ if (findMatch(entry, accessor, tupleIndex)) {
+ // find match; do aggregation
+ hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+ aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex, aggState);
+ } else {
+
+ internalTupleBuilder.reset();
+ for (int k = 0; k < keys.length; k++) {
+ internalTupleBuilder.addField(accessor, tupleIndex, keys[k]);
+ }
+ aggregator.init(internalTupleBuilder, accessor, tupleIndex, aggState);
+ int insertFrameIndex = -1, insertTupleIndex = -1;
+ boolean inserted = false;
+
+ if (currentLargestFrameIndex < 0) {
+ currentLargestFrameIndex = 0;
+ }
+
+ if (contents[currentLargestFrameIndex] == null) {
+ contents[currentLargestFrameIndex] = ctx.allocateFrame();
+ }
+
+ internalAppender.reset(contents[currentLargestFrameIndex], false);
+ if (internalAppender.append(internalTupleBuilder.getFieldEndOffsets(), internalTupleBuilder.getByteArray(),
+ 0, internalTupleBuilder.getSize())) {
+ inserted = true;
+ insertFrameIndex = currentLargestFrameIndex;
+ insertTupleIndex = internalAppender.getTupleCount() - 1;
+ }
+
+ if (!inserted && currentLargestFrameIndex < contents.length - 1) {
+ currentLargestFrameIndex++;
+ if (contents[currentLargestFrameIndex] == null) {
+ contents[currentLargestFrameIndex] = ctx.allocateFrame();
+ }
+ internalAppender.reset(contents[currentLargestFrameIndex], true);
+ if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new HyracksDataException("Failed to insert an aggregation value.");
+ } else {
+ insertFrameIndex = currentLargestFrameIndex;
+ insertTupleIndex = internalAppender.getTupleCount() - 1;
+ inserted = true;
+ }
+ }
+
+ // memory is full
+ if (!inserted) {
+ // flush hash table and try to insert again
+ flush();
+
+ // update the match point to the header reference
+ matchPointer.frameIndex = -1;
+ matchPointer.tupleIndex = -1;
+ // re-insert
+ currentLargestFrameIndex++;
+ if (contents[currentLargestFrameIndex] == null) {
+ contents[currentLargestFrameIndex] = ctx.allocateFrame();
+ }
+ internalAppender.reset(contents[currentLargestFrameIndex], true);
+ if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new HyracksDataException("Failed to insert an aggregation value.");
+ } else {
+ insertFrameIndex = currentLargestFrameIndex;
+ insertTupleIndex = internalAppender.getTupleCount() - 1;
+ }
+ }
+
+ // no match; new insertion
+ if (matchPointer.frameIndex < 0) {
+ // first record for this entry; update the header references
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int headerFrameOffset = getHeaderTupleIndex(entry);
+ if (headers[headerFrameIndex] == null) {
+ headers[headerFrameIndex] = ctx.allocateFrame();
+ resetHeader(headerFrameIndex);
+ }
+ headers[headerFrameIndex].putInt(headerFrameOffset, insertFrameIndex);
+ headers[headerFrameIndex].putInt(headerFrameOffset + INT_SIZE, insertTupleIndex);
+ usedEntries++;
+
+ } else {
+ // update the previous reference
+ hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+ int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
+ contents[matchPointer.frameIndex].putInt(refOffset, insertFrameIndex);
+ contents[matchPointer.frameIndex].putInt(refOffset + INT_SIZE, insertTupleIndex);
+ }
+ hashedKeys++;
+ totalTupleCount++;
+ }
+ }
+
+ /**
+ * Flush the hash table directly to the output
+ */
+ public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
+
+ // FIXME: remove this
+ outputAppender.reset(outputBuffer, true);
+ for (int i = 0; i < contents.length; i++) {
+ if (contents[i] == null) {
+ continue;
+ }
+ hashtableRecordAccessor.reset(contents[i]);
+ int tupleCount = hashtableRecordAccessor.getTupleCount();
+ for (int j = 0; j < tupleCount; j++) {
+ outputTupleBuilder.reset();
+
+ int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(j);
+ int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
+
+ for (int k = 0; k < internalKeys.length; k++) {
+ outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
+ + hashtableRecordAccessor.getFieldStartOffset(j, k),
+ hashtableRecordAccessor.getFieldLength(j, k));
+ }
+
+ aggregator.outputFinalResult(outputTupleBuilder, hashtableRecordAccessor, j, aggState);
+
+ if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
+ 0, outputTupleBuilder.getSize())) {
+
+ FrameUtils.flushFrame(outputBuffer, outputWriter);
+
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Failed to flush the hash table to the final output");
+ }
+ }
+ }
+ }
+
+ if (outputAppender.getTupleCount() > 0) {
+
+ FrameUtils.flushFrame(outputBuffer, outputWriter);
+
+ outputAppender.reset(outputBuffer, true);
+ }
+
+ totalTupleCount = 0;
+ usedEntries = 0;
+ }
+
+ /**
+ * Flush hash table into a run file.
+ *
+ * @throws HyracksDataException
+ */
+ protected void flush() throws HyracksDataException {
+
+ long methodTimer = System.nanoTime();
+
+ FileReference runFile;
+ try {
+ runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+ HybridHashSortGroupHashTable.class.getSimpleName());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter runWriter = new RunFileWriter(runFile, ctx.getIOManager());
+ runWriter.open();
+ flushEntries(runWriter);
+ runWriter.close();
+ runReaders.add(runWriter.createReader());
+ reset();
+
+ ctx.getCounterContext()
+ .getCounter("optional." + HybridHashSortGroupHashTable.class.getSimpleName() + ".flush.time", true)
+ .update(System.nanoTime() - methodTimer);
+ }
+
+ private void flushEntries(IFrameWriter writer) throws HyracksDataException {
+
+ outputAppender.reset(outputBuffer, true);
+ for (int i = 0; i < tableSize; i++) {
+ int tupleInEntry = sortEntry(i);
+
+ for (int ptr = 0; ptr < tupleInEntry; ptr++) {
+ int frameIndex = tPointers[ptr * PTR_SIZE];
+ int tupleIndex = tPointers[ptr * PTR_SIZE + 1];
+
+ hashtableRecordAccessor.reset(contents[frameIndex]);
+ outputTupleBuilder.reset();
+
+ int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(tupleIndex);
+ int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
+
+ for (int k = 0; k < internalKeys.length; k++) {
+ outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
+ + hashtableRecordAccessor.getFieldStartOffset(tupleIndex, k),
+ hashtableRecordAccessor.getFieldLength(tupleIndex, k));
+ }
+
+ aggregator.outputPartialResult(outputTupleBuilder, hashtableRecordAccessor, tupleIndex, aggState);
+
+ if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
+ 0, outputTupleBuilder.getSize())) {
+
+ FrameUtils.flushFrame(outputBuffer, writer);
+
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Failed to flush an aggregation result.");
+ }
+ }
+ totalTupleCount--;
+ }
+
+ if (tupleInEntry > 0) {
+ usedEntries--;
+ }
+ }
+
+ if (outputAppender.getTupleCount() > 0) {
+
+ FrameUtils.flushFrame(outputBuffer, writer);
+
+ outputAppender.reset(outputBuffer, true);
+ }
+ }
+
+ protected int sortEntry(int entryID) {
+
+ if (tPointers == null)
+ tPointers = new int[INIT_REF_COUNT * PTR_SIZE];
+ int ptr = 0;
+
+ int headerFrameIndex = entryID / frameSize * 2 * INT_SIZE + (entryID % frameSize) * 2 * INT_SIZE / frameSize;
+ int headerFrameOffset = (entryID % frameSize) * 2 * INT_SIZE % frameSize;
+
+ if (headers[headerFrameIndex] == null) {
+ return 0;
+ }
+
+ int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+ int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+ do {
+ if (entryFrameIndex < 0) {
+ break;
+ }
+ hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+ tPointers[ptr * PTR_SIZE] = entryFrameIndex;
+ tPointers[ptr * PTR_SIZE + 1] = entryTupleIndex;
+ int tStart = hashtableRecordAccessor.getTupleStartOffset(entryTupleIndex);
+ int f0StartRel = hashtableRecordAccessor.getFieldStartOffset(entryTupleIndex, internalKeys[0]);
+ int f0EndRel = hashtableRecordAccessor.getFieldEndOffset(entryTupleIndex, internalKeys[0]);
+ int f0Start = f0StartRel + tStart + hashtableRecordAccessor.getFieldSlotsLength();
+ tPointers[ptr * PTR_SIZE + 2] = firstNormalizer == null ? 0 : firstNormalizer.normalize(
+ hashtableRecordAccessor.getBuffer().array(), f0Start, f0EndRel - f0StartRel);
+
+ ptr++;
+
+ if (ptr * PTR_SIZE >= tPointers.length) {
+ int[] newTPointers = new int[tPointers.length * 2];
+ System.arraycopy(tPointers, 0, newTPointers, 0, tPointers.length);
+ tPointers = newTPointers;
+ }
+
+ // move to the next record
+ int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+ int prevFrameIndex = entryFrameIndex;
+ entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+ entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+
+ } while (true);
+
+ // sort records
+ if (ptr > 1) {
+ sort(0, ptr);
+ }
+
+ return ptr;
+ }
+
+ protected void sort(int offset, int len) {
+ int m = offset + (len >> 1);
+ int mFrameIndex = tPointers[m * PTR_SIZE];
+ int mTupleIndex = tPointers[m * PTR_SIZE + 1];
+ int mNormKey = tPointers[m * PTR_SIZE + 2];
+ compFrameAccessor1.reset(contents[mFrameIndex]);
+
+ int a = offset;
+ int b = a;
+ int c = offset + len - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int bFrameIndex = tPointers[b * PTR_SIZE];
+ int bTupleIndex = tPointers[b * PTR_SIZE + 1];
+ int bNormKey = tPointers[b * PTR_SIZE + 2];
+ int cmp = 0;
+ if (bNormKey != mNormKey) {
+ cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ compFrameAccessor2.reset(contents[bFrameIndex]);
+ cmp = compare(compFrameAccessor2, bTupleIndex, compFrameAccessor1, mTupleIndex);
+ }
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cFrameIndex = tPointers[c * PTR_SIZE];
+ int cTupleIndex = tPointers[c * PTR_SIZE + 1];
+ int cNormKey = tPointers[c * PTR_SIZE + 2];
+ int cmp = 0;
+ if (cNormKey != mNormKey) {
+ cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+ } else {
+ compFrameAccessor2.reset(contents[cFrameIndex]);
+ cmp = compare(compFrameAccessor2, cTupleIndex, compFrameAccessor1, mTupleIndex);
+ }
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(b++, c--);
+ }
+
+ int s;
+ int n = offset + len;
+ s = Math.min(a - offset, b - a);
+ vecswap(offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(b, n - s, s);
+
+ if ((s = b - a) > 1) {
+ sort(offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(n - s, s);
+ }
+ }
+
+ private void swap(int a, int b) {
+ for (int i = 0; i < PTR_SIZE; i++) {
+ int t = tPointers[a * PTR_SIZE + i];
+ tPointers[a * PTR_SIZE + i] = tPointers[b * PTR_SIZE + i];
+ tPointers[b * PTR_SIZE + i] = t;
+ }
+ }
+
+ private void vecswap(int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(a, b);
+ }
+ }
+
+ protected boolean findMatch(int entry, FrameTupleAccessor accessor, int tupleIndex) {
+
+ // reset the match pointer
+ matchPointer.frameIndex = -1;
+ matchPointer.tupleIndex = -1;
+
+ // get reference in the header
+ int headerFrameIndex = getHeaderFrameIndex(entry);
+ int headerFrameOffset = getHeaderTupleIndex(entry);
+
+ if (headers[headerFrameIndex] == null) {
+ return false;
+ }
+
+ // initialize the pointer to the first record
+ int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+ int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+ while (entryFrameIndex >= 0) {
+ matchPointer.frameIndex = entryFrameIndex;
+ matchPointer.tupleIndex = entryTupleIndex;
+ hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+
+ if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
+ return true;
+ }
+ // Move to the next record in this entry following the linked list
+ int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+ int prevFrameIndex = entryFrameIndex;
+ entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+ entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+ }
+
+ return false;
+ }
+
+ public LinkedList<RunFileReader> getRunFileReaders() {
+ return runReaders;
+ }
+
+ private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
+ int hashTupleIndex) {
+ int tStart0 = accessor.getTupleStartOffset(tupleIndex);
+ int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
+ int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < keys.length; ++i) {
+ int fStart0 = accessor.getFieldStartOffset(tupleIndex, keys[i]);
+ int fEnd0 = accessor.getFieldEndOffset(tupleIndex, keys[i]);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
+ int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private int compare(FrameTupleAccessorForGroupHashtable accessor1, int tupleIndex1,
+ FrameTupleAccessorForGroupHashtable accessor2, int tupleIndex2) {
+ int tStart1 = accessor1.getTupleStartOffset(tupleIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int tStart2 = accessor2.getTupleStartOffset(tupleIndex2);
+ int fStartOffset2 = accessor2.getFieldSlotsLength() + tStart2;
+
+ for (int i = 0; i < internalKeys.length; ++i) {
+ int fStart1 = accessor1.getFieldStartOffset(tupleIndex1, internalKeys[i]);
+ int fEnd1 = accessor1.getFieldEndOffset(tupleIndex1, internalKeys[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int fStart2 = accessor2.getFieldStartOffset(tupleIndex2, internalKeys[i]);
+ int fEnd2 = accessor2.getFieldEndOffset(tupleIndex2, internalKeys[i]);
+ int fLen2 = fEnd2 - fStart2;
+
+ int c = comparators[i].compare(accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1, accessor2
+ .getBuffer().array(), fStart2 + fStartOffset2, fLen2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public void reset() {
+ for (int i = 0; i < headers.length; i++) {
+ if (headers[i] != null) {
+ resetHeader(i);
+ }
+ }
+ for (int i = 0; i < contents.length; i++) {
+ if (contents[i] != null) {
+ contents[i].putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
+ }
+ }
+
+ usedEntries = 0;
+ totalTupleCount = 0;
+ currentLargestFrameIndex = -1;
+ }
+
+ public void finishup() throws HyracksDataException {
+ if (runReaders.size() > 0) {
+ flush();
+ }
+
+ hashedKeys = 0;
+ hashedRawRec = 0;
+ }
+
+ /**
+ * Close the hash table. Note that only memory allocated by frames are freed. Aggregation
+ * states maintained in {@link #aggState} and run file readers in {@link #runReaders} should
+ * be valid for later processing.
+ */
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < headers.length; i++) {
+ headers[i] = null;
+ }
+ for (int i = 0; i < contents.length; i++) {
+ contents[i] = null;
+ }
+ outputBuffer = null;
+ tPointers = null;
+ }
+
+ public int getTupleCount() {
+ return totalTupleCount;
+ }
+
+ public int getFrameSize() {
+ return headers.length + contents.length + 1;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
new file mode 100644
index 0000000..5296c9f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class HybridHashSortGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+
+ private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+ private static final int MERGE_ACTIVITY_ID = 1;
+
+ private static final long serialVersionUID = 1L;
+ private final int[] keyFields, storedKeyFields;
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final IAggregatorDescriptorFactory mergerFactory;
+
+ private final ITuplePartitionComputerFactory aggTpcf, mergeTpcf;
+
+ private final int framesLimit;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ private final int tableSize;
+
+ private final boolean isLoadOptimized;
+
+ public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+ ITuplePartitionComputerFactory mergeTpcf, IAggregatorDescriptorFactory aggregatorFactory,
+ IAggregatorDescriptorFactory mergerFactory, RecordDescriptor recordDescriptor) {
+ this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, null, aggregatorFactory,
+ mergerFactory, recordDescriptor, false);
+ }
+
+ public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+ ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor recordDescriptor) {
+ this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, firstNormalizerFactory,
+ aggregatorFactory, mergerFactory, recordDescriptor, false);
+ }
+
+ public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
+ ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor recordDescriptor, boolean isLoadOpt) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ if (framesLimit <= 2) {
+ /**
+ * Minimum of 3 frames: 2 for in-memory hash table, and 1 for output
+ * aggregation results.
+ */
+ throw new IllegalStateException("frame limit should at least be 3, but it is " + framesLimit + "!");
+ }
+
+ storedKeyFields = new int[keyFields.length];
+ for (int i = 0; i < storedKeyFields.length; i++) {
+ storedKeyFields[i] = i;
+ }
+ this.aggregatorFactory = aggregatorFactory;
+ this.mergerFactory = mergerFactory;
+ this.keyFields = keyFields;
+ this.comparatorFactories = comparatorFactories;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+ this.aggTpcf = aggTpcf;
+ this.mergeTpcf = mergeTpcf;
+ this.tableSize = tableSize;
+
+ /**
+ * Set the record descriptor. Note that since this operator is a unary
+ * operator, only the first record descriptor is used here.
+ */
+ recordDescriptors[0] = recordDescriptor;
+
+ this.isLoadOptimized = isLoadOpt;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities(edu.uci.ics.hyracks.api.dataflow.
+ * IActivityGraphBuilder)
+ */
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+ builder.addActivity(this, aggregateAct);
+ builder.addSourceEdge(0, aggregateAct, 0);
+
+ builder.addActivity(this, mergeAct);
+ builder.addTargetEdge(0, mergeAct, 0);
+
+ builder.addBlockingEdge(aggregateAct, mergeAct);
+ }
+
+ public static class AggregateActivityState extends AbstractStateObject {
+
+ private HybridHashSortGroupHashTable gTable;
+
+ public AggregateActivityState() {
+ }
+
+ private AggregateActivityState(JobId jobId, TaskId tId) {
+ super(jobId, tId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private class AggregateActivity extends AbstractActivityNode {
+
+ private static final long serialVersionUID = 1L;
+
+ public AggregateActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ HybridHashSortGroupHashTable serializableGroupHashtable;
+
+ FrameTupleAccessor accessor;
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ serializableGroupHashtable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
+ keyFields, comparators, aggTpcf.createPartitioner(),
+ firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(
+ ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+ recordDescriptors[0]);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ serializableGroupHashtable.insert(accessor, i);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ serializableGroupHashtable.finishup();
+ AggregateActivityState state = new AggregateActivityState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
+ state.gTable = serializableGroupHashtable;
+ ctx.setStateObject(state);
+ }
+ };
+ }
+ }
+
+ private class MergeActivity extends AbstractActivityNode {
+
+ private static final long serialVersionUID = 1L;
+
+ public MergeActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ public void initialize() throws HyracksDataException {
+
+ AggregateActivityState aggState = (AggregateActivityState) ctx.getStateObject(new TaskId(
+ new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition));
+
+ LinkedList<RunFileReader> runs = aggState.gTable.getRunFileReaders();
+
+ writer.open();
+ if (runs.size() <= 0) {
+ aggState.gTable.flushHashtableToOutput(writer);
+ aggState.gTable.close();
+ } else {
+ aggState.gTable.close();
+
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ HybridHashSortRunMerger merger = new HybridHashSortRunMerger(ctx, runs, storedKeyFields,
+ comparators, recordDescriptors[0], mergeTpcf.createPartitioner(),
+ mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
+ storedKeyFields, storedKeyFields), framesLimit, tableSize, writer,
+ isLoadOptimized);
+
+ merger.process();
+ }
+
+ writer.close();
+ }
+
+ };
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
new file mode 100644
index 0000000..1de2237
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
@@ -0,0 +1,488 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class HybridHashSortGrouperBucketMerge {
+
+ private final int[] keyFields;
+ private final IBinaryComparator[] comparators;
+
+ private final IAggregatorDescriptor merger;
+ private final AggregateState mergeState;
+
+ private final int framesLimit, tableSize;
+
+ private final RecordDescriptor inRecDesc;
+
+ private final IHyracksTaskContext ctx;
+
+ private final ArrayTupleBuilder tupleBuilder;
+
+ private final IFrameWriter outputWriter;
+
+ private final ITuplePartitionComputer tpc;
+
+ private final boolean isLoadOptimized;
+
+ List<ByteBuffer> inFrames;
+ ByteBuffer outFrame, writerFrame;
+ FrameTupleAppender outAppender, writerAppender;
+ LinkedList<RunFileReader> runs;
+ ArrayTupleBuilder finalTupleBuilder;
+ FrameTupleAccessor outFrameAccessor;
+ int[] currentFrameIndexInRun, currentRunFrames, currentBucketInRun;
+ int runFrameLimit = 1;
+
+ public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+ ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
+ RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ this.framesLimit = framesLimit;
+ this.tableSize = tableSize;
+
+ this.keyFields = keyFields;
+ this.comparators = comparators;
+ this.merger = merger;
+ this.mergeState = merger.createAggregateStates();
+
+ this.inRecDesc = inRecDesc;
+
+ this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+ this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+ this.outputWriter = outputWriter;
+
+ this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+
+ this.tpc = tpc;
+
+ this.isLoadOptimized = true;
+ }
+
+ public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
+ ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
+ RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter, boolean loadOptimized)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ this.framesLimit = framesLimit;
+ this.tableSize = tableSize;
+
+ this.keyFields = keyFields;
+ this.comparators = comparators;
+ this.merger = merger;
+ this.mergeState = merger.createAggregateStates();
+
+ this.inRecDesc = inRecDesc;
+
+ this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
+
+ this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+ this.outputWriter = outputWriter;
+
+ this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+
+ this.tpc = tpc;
+
+ this.isLoadOptimized = loadOptimized;
+ }
+
+ public void initialize(LinkedList<RunFileReader> runFiles) throws HyracksDataException {
+
+ runs = runFiles;
+
+ try {
+ if (runs.size() <= 0) {
+ return;
+ } else {
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outAppender.reset(outFrame, true);
+ outFrameAccessor.reset(outFrame);
+ int runProcOffset = 0;
+ while (runs.size() > 0) {
+ try {
+ doPass(runs, runProcOffset);
+ if (runs.size() + 2 <= framesLimit) {
+ // final phase
+ runProcOffset = 0;
+ } else {
+ // one more merge level
+ runProcOffset++;
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ inFrames.clear();
+ }
+ } catch (Exception e) {
+ outputWriter.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ mergeState.close();
+ }
+ }
+
+ private void doPass(LinkedList<RunFileReader> runs, int offset) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = outputWriter;
+ boolean finalPass = false;
+
+ int runNumber = runs.size() - offset;
+
+ while (inFrames.size() + 2 < framesLimit) {
+ inFrames.add(ctx.allocateFrame());
+ }
+
+ if (runNumber + 2 <= framesLimit) {
+ finalPass = true;
+ if (isLoadOptimized)
+ runFrameLimit = (framesLimit - 2) / runNumber;
+ else
+ runFrameLimit = 1;
+ } else {
+ runFrameLimit = 1;
+ runNumber = framesLimit - 2;
+ newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+ HybridHashSortGrouperBucketMerge.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ currentFrameIndexInRun = new int[runNumber];
+ currentRunFrames = new int[runNumber];
+ currentBucketInRun = new int[runNumber];
+ /**
+ * Create file readers for each input run file, only for
+ * the ones fit into the inFrames
+ */
+ RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceHashEntry> comparator = createEntryComparator(comparators);
+ ReferencedBucketBasedPriorityQueue topTuples = new ReferencedBucketBasedPriorityQueue(ctx.getFrameSize(),
+ inRecDesc, runNumber, comparator, tpc, tableSize);
+ /**
+ * current tuple index in each run
+ */
+ int[] tupleIndices = new int[runNumber];
+
+ for (int i = 0; i < runNumber; i++) {
+ int runIndex = i + offset;
+ tupleIndices[i] = 0;
+ // Load the run file
+ runFileReaders[i] = runs.get(runIndex);
+ runFileReaders[i].open();
+
+ currentRunFrames[i] = 0;
+ currentFrameIndexInRun[i] = i * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[i] + j;
+ boolean hasNextFrame = runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex));
+ if (hasNextFrame) {
+ tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ currentRunFrames[i]++;
+ if (j == 0) {
+ currentBucketInRun[i] = tpc.partition(tupleAccessors[frameIndex], tupleIndices[i],
+ tableSize);
+ setNextTopTuple(i, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Start merging
+ */
+ while (!topTuples.areRunsExhausted()) {
+ /**
+ * Get the top record
+ */
+ ReferenceEntry top = topTuples.peek();
+ int tupleIndex = top.getTupleIndex();
+ int runIndex = topTuples.peek().getRunid();
+
+ FrameTupleAccessor fta = top.getAccessor();
+
+ int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+ if (currentTupleInOutFrame < 0
+ || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+
+ tupleBuilder.reset();
+
+ for (int k = 0; k < keyFields.length; k++) {
+ tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
+ }
+
+ merger.init(tupleBuilder, fta, tupleIndex, mergeState);
+
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ flushOutFrame(writer, finalPass);
+ if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "The partial result is too large to be initialized in a frame.");
+ }
+ }
+
+ } else {
+ /**
+ * if new tuple is in the same group of the
+ * current aggregator do merge and output to the
+ * outFrame
+ */
+
+ merger.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, mergeState);
+
+ }
+ tupleIndices[runIndex]++;
+ setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+ }
+
+ if (outAppender.getTupleCount() > 0) {
+ flushOutFrame(writer, finalPass);
+ outAppender.reset(outFrame, true);
+ }
+
+ merger.close();
+
+ runs.subList(offset, runNumber).clear();
+ /**
+ * insert the new run file into the beginning of the run
+ * file list
+ */
+ if (!finalPass) {
+ runs.add(offset, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ mergeState.reset();
+ }
+ }
+
+ private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+ if (finalTupleBuilder == null) {
+ finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
+ }
+
+ if (writerFrame == null) {
+ writerFrame = ctx.allocateFrame();
+ }
+
+ if (writerAppender == null) {
+ writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+ writerAppender.reset(writerFrame, true);
+
+ outFrameAccessor.reset(outFrame);
+
+ for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+ finalTupleBuilder.reset();
+
+ for (int k = 0; k < keyFields.length; k++) {
+ finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
+ }
+
+ if (isFinal) {
+
+ merger.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
+
+ } else {
+
+ merger.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
+ }
+
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+ finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+ throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+ }
+ }
+ }
+ if (writerAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writerFrame, writer);
+ writerAppender.reset(writerFrame, true);
+ }
+
+ outAppender.reset(outFrame, true);
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedBucketBasedPriorityQueue topTuples)
+ throws HyracksDataException {
+ int runStart = runIndex * runFrameLimit;
+ boolean existNext = false;
+ if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+ /**
+ * run already closed
+ */
+ existNext = false;
+ } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+ /**
+ * not the last frame for this run
+ */
+ existNext = true;
+ if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex]++;
+ }
+ } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+ /**
+ * the last frame has expired
+ */
+ existNext = true;
+ } else {
+ /**
+ * If all tuples in the targeting frame have been
+ * checked.
+ */
+ tupleIndices[runIndex] = 0;
+ currentFrameIndexInRun[runIndex] = runStart;
+ /**
+ * read in batch
+ */
+ currentRunFrames[runIndex] = 0;
+ for (int j = 0; j < runFrameLimit; j++) {
+ int frameIndex = currentFrameIndexInRun[runIndex] + j;
+ if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+ tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+ existNext = true;
+ currentRunFrames[runIndex]++;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (existNext) {
+ topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ /**
+ * Close the run file, and also the corresponding readers and
+ * input frame.
+ *
+ * @param index
+ * @param runCursors
+ * @param tupleAccessor
+ * @throws HyracksDataException
+ */
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ if (runCursors[index] != null) {
+ runCursors[index].close();
+ runCursors[index] = null;
+ int frameOffset = index * runFrameLimit;
+ for (int j = 0; j < runFrameLimit; j++) {
+ tupleAccessor[frameOffset + j] = null;
+ }
+ }
+ }
+
+ private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldLength(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+ int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+ int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+ int l2 = l2_end - l2_start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private Comparator<ReferenceHashEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceHashEntry>() {
+
+ @Override
+ public int compare(ReferenceHashEntry o1, ReferenceHashEntry o2) {
+ int cmp = o1.getHashValue() - o2.getHashValue();
+ if (cmp != 0) {
+ return cmp;
+ } else {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+ int j1 = o1.getTupleIndex();
+ int j2 = o2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < keyFields.length; ++f) {
+ int fIdx = f;
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
new file mode 100644
index 0000000..8695e0b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class HybridHashSortRunMerger {
+
+ private final IHyracksTaskContext ctx;
+ private final List<RunFileReader> runs;
+ private final int[] keyFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final int framesLimit;
+ private final int tableSize;
+ private final IFrameWriter writer;
+ private final IAggregatorDescriptor grouper;
+ private final ITuplePartitionComputer tpc;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+ private final boolean isLoadBuffered;
+
+ public HybridHashSortRunMerger(IHyracksTaskContext ctx, LinkedList<RunFileReader> runs, int[] keyFields,
+ IBinaryComparator[] comparators, RecordDescriptor recordDesc, ITuplePartitionComputer tpc,
+ IAggregatorDescriptor grouper, int framesLimit, int tableSize, IFrameWriter writer, boolean isLoadBuffered) {
+ this.ctx = ctx;
+ this.runs = runs;
+ this.keyFields = keyFields;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ this.isLoadBuffered = isLoadBuffered;
+ this.tableSize = tableSize;
+ this.tpc = tpc;
+ this.grouper = grouper;
+ }
+
+ public void process() throws HyracksDataException {
+
+ writer.open();
+ // FIXME
+ int mergeLevels = 0, mergeRunCount = 0;
+ try {
+
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
+
+ int maxMergeWidth = framesLimit - 1;
+ while (runs.size() > maxMergeWidth) {
+ int generationSeparator = 0;
+ // FIXME
+ int mergeRounds = 0;
+ while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+ int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth), runs.size()
+ - maxMergeWidth + 1);
+ FileReference newRun = null;
+ IFrameWriter mergeResultWriter = this.writer;
+ newRun = ctx.createManagedWorkspaceFile(HybridHashSortRunMerger.class.getSimpleName());
+ mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+ mergeResultWriter.open();
+ IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+ for (int i = 0; i < mergeWidth; i++) {
+ runCursors[i] = runs.get(generationSeparator + i);
+ }
+ merge(mergeResultWriter, runCursors, false);
+ runs.subList(generationSeparator, generationSeparator + mergeWidth).clear();
+ runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
+ mergeRounds++;
+ }
+ mergeLevels++;
+ mergeRunCount += mergeRounds;
+ }
+ if (!runs.isEmpty()) {
+ IFrameReader[] runCursors = new RunFileReader[runs.size()];
+ for (int i = 0; i < runCursors.length; i++) {
+ runCursors[i] = runs.get(i);
+ }
+ merge(writer, runCursors, true);
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+
+ ctx.getCounterContext()
+ .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.runs.count", true)
+ .set(mergeRunCount);
+
+ ctx.getCounterContext()
+ .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.levels", true)
+ .set(mergeLevels);
+ }
+ }
+
+ private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors, boolean isFinal)
+ throws HyracksDataException {
+ // FIXME
+ long methodTimer = System.nanoTime();
+
+ IFrameReader merger = new GroupRunMergingFrameReader(ctx, runCursors, framesLimit, tableSize, keyFields, tpc,
+ comparators, grouper, recordDesc, isFinal, isLoadBuffered);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
+ FrameUtils.flushFrame(outFrame, mergeResultWriter);
+ }
+ } finally {
+ merger.close();
+ }
+ ctx.getCounterContext()
+ .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.time", true)
+ .update(System.nanoTime() - methodTimer);
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
new file mode 100644
index 0000000..3c91fea
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferenceEntryWithBucketID extends ReferenceEntry {
+
+ private int bucketID;
+
+ public ReferenceEntryWithBucketID(int runid, FrameTupleAccessor fta, int tupleIndex, int bucketID) {
+ super(runid, fta, tupleIndex);
+ this.bucketID = bucketID;
+ }
+
+ public int getBucketID() {
+ return bucketID;
+ }
+
+ public void setBucketID(int bucketID) {
+ this.bucketID = bucketID;
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
new file mode 100644
index 0000000..394f0a8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferenceHashEntry extends ReferenceEntry {
+
+ private int hashValue;
+
+ public ReferenceHashEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int hashVal) {
+ super(runid, fta, tupleIndex);
+ this.hashValue = hashVal;
+ }
+
+ public int getHashValue() {
+ return hashValue;
+ }
+
+ public void setHashValue(int hashVal) {
+ this.hashValue = hashVal;
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
new file mode 100644
index 0000000..adfbe81
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+
+public class ReferencedBucketBasedPriorityQueue {
+
+ private final int frameSize;
+ private final RecordDescriptor recordDescriptor;
+ private final ReferenceHashEntry entries[];
+ private final int size;
+ private final BitSet runAvail;
+ private int nItems;
+ private final int tableSize;
+
+ private final Comparator<ReferenceHashEntry> comparator;
+
+ private final ITuplePartitionComputer tpc;
+
+ public ReferencedBucketBasedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
+ Comparator<ReferenceHashEntry> comparator, ITuplePartitionComputer tpc, int tableSize) {
+ this.frameSize = frameSize;
+ this.recordDescriptor = recordDescriptor;
+ if (initSize < 1)
+ throw new IllegalArgumentException();
+ this.comparator = comparator;
+ nItems = initSize;
+ size = (initSize + 1) & 0xfffffffe;
+ entries = new ReferenceHashEntry[size];
+ runAvail = new BitSet(size);
+ runAvail.set(0, initSize, true);
+ for (int i = 0; i < size; i++) {
+ entries[i] = new ReferenceHashEntry(i, null, -1, -1);
+ }
+ this.tpc = tpc;
+ this.tableSize = tableSize;
+ }
+
+ /**
+ * Retrieve the top entry without removing it
+ *
+ * @return the top entry
+ */
+ public ReferenceEntry peek() {
+ return entries[0];
+ }
+
+ /**
+ * compare the new entry with entries within the queue, to find a spot for
+ * this new entry
+ *
+ * @param entry
+ * @return runid of this entry
+ * @throws HyracksDataException
+ * @throws IOException
+ */
+ public int popAndReplace(FrameTupleAccessor fta, int tIndex) throws HyracksDataException {
+ ReferenceHashEntry entry = entries[0];
+ if (entry.getAccessor() == null) {
+ entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
+ }
+ entry.getAccessor().reset(fta.getBuffer());
+ entry.setTupleIndex(tIndex);
+ entry.setHashValue(tpc.partition(fta, tIndex, tableSize));
+
+ add(entry);
+ return entry.getRunid();
+ }
+
+ /**
+ * Push entry into priority queue
+ *
+ * @param e
+ * the new Entry
+ * @throws HyracksDataException
+ */
+ private void add(ReferenceHashEntry e) throws HyracksDataException {
+ ReferenceHashEntry min = entries[0];
+ int slot = (size >> 1) + (min.getRunid() >> 1);
+
+ ReferenceHashEntry curr = e;
+ while (!runAvail.isEmpty() && slot > 0) {
+ int c = 0;
+ if (!runAvail.get(entries[slot].getRunid())) {
+ // run of entries[slot] is exhausted, i.e. not available, curr
+ // wins
+ c = 1;
+ } else if (entries[slot].getAccessor() != null /*
+ * entries[slot] is
+ * not MIN value
+ */
+ && runAvail.get(curr.getRunid() /* curr run is available */)) {
+
+ if (curr.getAccessor() != null) {
+ c = comparator.compare(entries[slot], curr);
+ } else {
+ // curr is MIN value, wins
+ c = 1;
+ }
+ }
+
+ if (c <= 0) { // curr lost
+ // entries[slot] swaps up
+ ReferenceHashEntry tmp = entries[slot];
+ entries[slot] = curr;
+ curr = tmp;// winner to pass up
+ }// else curr wins
+ slot >>= 1;
+ }
+ // set new entries[0]
+ entries[0] = curr;
+ }
+
+ /**
+ * Pop is called only when a run is exhausted
+ *
+ * @return
+ * @throws HyracksDataException
+ */
+ public ReferenceHashEntry pop() throws HyracksDataException {
+ ReferenceHashEntry min = entries[0];
+ runAvail.clear(min.getRunid());
+ add(min);
+ nItems--;
+ return min;
+ }
+
+ public boolean areRunsExhausted() {
+ return runAvail.isEmpty();
+ }
+
+ public int size() {
+ return nItems;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
new file mode 100644
index 0000000..d9d5118
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
@@ -0,0 +1,133 @@
+package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Comparator;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * TODO need to be merged with the ReferencedPriorityQueue in the util package
+ */
+public class ReferencedPriorityQueue {
+ private final int frameSize;
+ private final RecordDescriptor recordDescriptor;
+ private final ReferenceEntryWithBucketID entries[];
+ private final int size;
+ private final BitSet runAvail;
+ private int nItems;
+
+ private final Comparator<ReferenceEntryWithBucketID> comparator;
+
+ public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
+ Comparator<ReferenceEntryWithBucketID> comparator) {
+ this.frameSize = frameSize;
+ this.recordDescriptor = recordDescriptor;
+ if (initSize < 1)
+ throw new IllegalArgumentException();
+ this.comparator = comparator;
+ nItems = initSize;
+ size = (initSize + 1) & 0xfffffffe;
+ entries = new ReferenceEntryWithBucketID[size];
+ runAvail = new BitSet(size);
+ runAvail.set(0, initSize, true);
+ for (int i = 0; i < size; i++) {
+ entries[i] = new ReferenceEntryWithBucketID(i, null, -1, -1);
+ }
+ }
+
+ /**
+ * Retrieve the top entry without removing it
+ *
+ * @return the top entry
+ */
+ public ReferenceEntryWithBucketID peek() {
+ return entries[0];
+ }
+
+ /**
+ * compare the new entry with entries within the queue, to find a spot for
+ * this new entry
+ *
+ * @param entry
+ * @return runid of this entry
+ * @throws IOException
+ */
+ public int popAndReplace(FrameTupleAccessor fta, int tIndex, int bucketID) {
+ ReferenceEntryWithBucketID entry = entries[0];
+ if (entry.getAccessor() == null) {
+ entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
+ }
+ entry.getAccessor().reset(fta.getBuffer());
+ entry.setTupleIndex(tIndex);
+ entry.setBucketID(bucketID);
+
+ add(entry);
+ return entry.getRunid();
+ }
+
+ /**
+ * Push entry into priority queue
+ *
+ * @param e
+ * the new Entry
+ */
+ private void add(ReferenceEntryWithBucketID e) {
+ ReferenceEntryWithBucketID min = entries[0];
+ int slot = (size >> 1) + (min.getRunid() >> 1);
+
+ ReferenceEntryWithBucketID curr = e;
+ while (!runAvail.isEmpty() && slot > 0) {
+ int c = 0;
+ if (!runAvail.get(entries[slot].getRunid())) {
+ // run of entries[slot] is exhausted, i.e. not available, curr
+ // wins
+ c = 1;
+ } else if (entries[slot].getAccessor() != null /*
+ * entries[slot] is
+ * not MIN value
+ */
+ && runAvail.get(curr.getRunid() /* curr run is available */)) {
+
+ if (curr.getAccessor() != null) {
+ c = comparator.compare(entries[slot], curr);
+ } else {
+ // curr is MIN value, wins
+ c = 1;
+ }
+ }
+
+ if (c <= 0) { // curr lost
+ // entries[slot] swaps up
+ ReferenceEntryWithBucketID tmp = entries[slot];
+ entries[slot] = curr;
+ curr = tmp;// winner to pass up
+ }// else curr wins
+ slot >>= 1;
+ }
+ // set new entries[0]
+ entries[0] = curr;
+ }
+
+ /**
+ * Pop is called only when a run is exhausted
+ *
+ * @return
+ */
+ public ReferenceEntryWithBucketID pop() {
+ ReferenceEntryWithBucketID min = entries[0];
+ runAvail.clear(min.getRunid());
+ add(min);
+ nItems--;
+ return min;
+ }
+
+ public boolean areRunsExhausted() {
+ return runAvail.isEmpty();
+ }
+
+ public int size() {
+ return nItems;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
new file mode 100644
index 0000000..72bae76
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class FrameTupleAccessorForGroupHashtable implements IFrameTupleAccessor {
+ private final int frameSize;
+ private final RecordDescriptor recordDescriptor;
+
+ private final static int INT_SIZE = 4;
+
+ private ByteBuffer buffer;
+
+ public FrameTupleAccessorForGroupHashtable(int frameSize, RecordDescriptor recordDescriptor) {
+ this.frameSize = frameSize;
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldCount()
+ */
+ @Override
+ public int getFieldCount() {
+ return recordDescriptor.getFieldCount();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldSlotsLength()
+ */
+ @Override
+ public int getFieldSlotsLength() {
+ return getFieldCount() * 4;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldEndOffset(int, int)
+ */
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldStartOffset(int, int)
+ */
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldLength(int, int)
+ */
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleEndOffset(int)
+ */
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1)) - 2 * INT_SIZE;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleStartOffset(int)
+ */
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleCount()
+ */
+ @Override
+ public int getTupleCount() {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getBuffer()
+ */
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#reset(java.nio.ByteBuffer)
+ */
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public int getTupleHashReferenceOffset(int tupleIndex) {
+ return getTupleEndOffset(tupleIndex);
+ }
+
+ public int getTupleEndOffsetWithHashReference(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+ }
+
+ public int getHashReferenceNextFrameIndex(int tupleIndex) {
+ return buffer.getInt(getTupleHashReferenceOffset(tupleIndex));
+ }
+
+ public int getHashReferenceNextTupleIndex(int tupleIndex) {
+ return buffer.getInt(getTupleHashReferenceOffset(tupleIndex) + INT_SIZE);
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
new file mode 100644
index 0000000..c5668f5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public class FrameTupleAppenderForGroupHashtable {
+ private final int frameSize;
+
+ private ByteBuffer buffer;
+
+ private int tupleCount;
+
+ private int tupleDataEndOffset;
+
+ public FrameTupleAppenderForGroupHashtable(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ public void reset(ByteBuffer buffer, boolean clear) {
+ this.buffer = buffer;
+ if (clear) {
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
+ tupleCount = 0;
+ tupleDataEndOffset = 0;
+ } else {
+ tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)
+ - tupleCount * 4);
+ }
+ }
+
+ public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
+ if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ for (int i = 0; i < fieldSlots.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+ }
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 4, length);
+ buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length, -1);
+ buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length + 4, -1);
+ tupleDataEndOffset += fieldSlots.length * 4 + length + 2 * 4;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean append(byte[] bytes, int offset, int length) {
+ if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+ buffer.putInt(tupleDataEndOffset + length, -1);
+ buffer.putInt(tupleDataEndOffset + length + 4, -1);
+ tupleDataEndOffset += length + 2 * 4;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) {
+ if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ int effectiveSlots = 0;
+ for (int i = 0; i < fieldSlots.length; ++i) {
+ if (fieldSlots[i] > 0) {
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
+ effectiveSlots++;
+ }
+ }
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
+ buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length, -1);
+ buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length + 4, -1);
+ tupleDataEndOffset += effectiveSlots * 4 + length + 2 * 4;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
+ int length = tEndOffset - tStartOffset;
+ if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ ByteBuffer src = tupleAccessor.getBuffer();
+ System.arraycopy(src.array(), tStartOffset, buffer.array(), tupleDataEndOffset, length);
+ buffer.putInt(tupleDataEndOffset + length, -1);
+ buffer.putInt(tupleDataEndOffset + length + 4, -1);
+ tupleDataEndOffset += length + 2 * 4;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) {
+ int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
+ int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
+ return append(tupleAccessor, tStartOffset, tEndOffset);
+ }
+
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+}
+
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
new file mode 100644
index 0000000..be92b84
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HybridHashGroupHashTable implements IFrameWriter {
+
+ private final static int HEADER_REF_EMPTY = -1;
+
+ private static final int INT_SIZE = 4;
+
+ private IHyracksTaskContext ctx;
+
+ private final int frameSize;
+
+ private final int framesLimit;
+
+ private final int tableSize;
+
+ private final int numOfPartitions;
+
+ private final IFrameWriter outputWriter;
+
+ private final IBinaryComparator[] comparators;
+
+ /**
+ * index for keys
+ */
+ private final int[] inputKeys, internalKeys;
+
+ private final RecordDescriptor inputRecordDescriptor, outputRecordDescriptor;
+
+ /**
+ * hash partitioner for hashing
+ */
+ private final ITuplePartitionComputer hashComputer;
+
+ /**
+ * hash partitioner for partitioning
+ */
+ private final ITuplePartitionComputer partitionComputer;
+
+ /**
+ * Hashtable haders
+ */
+ private ByteBuffer[] headers;
+
+ /**
+ * buffers for hash table
+ */
+ private ByteBuffer[] contents;
+
+ /**
+ * output buffers for spilled partitions
+ */
+ private ByteBuffer[] spilledPartOutputBuffers;
+
+ /**
+ * run writers for spilled partitions
+ */
+ private RunFileWriter[] spilledPartRunWriters;
+
+ private int[] spilledPartRunSizeArrayInFrames;
+ private int[] spilledPartRunSizeArrayInTuples;
+
+ private List<IFrameReader> spilledPartRunReaders = null;
+ private List<Integer> spilledRunAggregatedPages = null;
+ private List<Integer> spilledPartRunSizesInFrames = null;
+ private List<Integer> spilledPartRunSizesInTuples = null;
+
+ /**
+ * index of the current working buffer in hash table
+ */
+ private int currentHashTableFrame;
+
+ /**
+ * Aggregation state
+ */
+ private AggregateState htAggregateState;
+
+ /**
+ * the aggregator
+ */
+ private final IAggregatorDescriptor aggregator;
+
+ /**
+ * records inserted into the in-memory hash table (for hashing and aggregation)
+ */
+ private int hashedRawRecords = 0;
+
+ /**
+ * in-memory part size in tuples
+ */
+ private int hashedKeys = 0;
+
+ /**
+ * Hash table tuple pointer
+ */
+ private TuplePointer matchPointer;
+
+ /**
+ * Frame tuple accessor for input data frames
+ */
+ private FrameTupleAccessor inputFrameTupleAccessor;
+
+ /**
+ * flag for whether the hash table if full
+ */
+ private boolean isHashtableFull;
+
+ /**
+ * flag for only partition (no aggregation and hashing)
+ */
+ private boolean isPartitionOnly;
+
+ /**
+ * Tuple accessor for hash table contents
+ */
+ private FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
+
+ private ArrayTupleBuilder internalTupleBuilder;
+
+ private FrameTupleAppender spilledPartInsertAppender;
+
+ private FrameTupleAppenderForGroupHashtable htInsertAppender;
+
+ public HybridHashGroupHashTable(IHyracksTaskContext ctx, int framesLimit, int tableSize, int numOfPartitions,
+ int[] keys, int hashSeedOffset, IBinaryComparator[] comparators, ITuplePartitionComputerFamily tpcFamily,
+ IAggregatorDescriptor aggregator, RecordDescriptor inputRecordDescriptor,
+ RecordDescriptor outputRecordDescriptor, IFrameWriter outputWriter) throws HyracksDataException {
+ this.ctx = ctx;
+ this.frameSize = ctx.getFrameSize();
+ this.tableSize = tableSize;
+ this.framesLimit = framesLimit;
+ this.numOfPartitions = numOfPartitions;
+ this.inputKeys = keys;
+ this.internalKeys = new int[keys.length];
+ for (int i = 0; i < internalKeys.length; i++) {
+ internalKeys[i] = i;
+ }
+
+ this.comparators = comparators;
+
+ this.inputRecordDescriptor = inputRecordDescriptor;
+ this.outputRecordDescriptor = outputRecordDescriptor;
+
+ this.outputWriter = outputWriter;
+
+ this.hashComputer = tpcFamily.createPartitioner(hashSeedOffset * 2);
+ this.partitionComputer = tpcFamily.createPartitioner(hashSeedOffset * 2 + 1);
+
+ this.aggregator = aggregator;
+
+ }
+
+ public static double getHashtableOverheadRatio(int tableSize, int frameSize, int framesLimit, int recordSizeInByte) {
+ int pagesForRecord = framesLimit - getHeaderPages(tableSize, frameSize);
+ int recordsInHashtable = (pagesForRecord - 1) * ((int) (frameSize / (recordSizeInByte + 2 * INT_SIZE)));
+
+ return (double) framesLimit * frameSize / recordsInHashtable / recordSizeInByte;
+ }
+
+ public static int getHeaderPages(int tableSize, int frameSize) {
+ return (int) Math.ceil((double)tableSize * INT_SIZE * 2 / frameSize);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ // initialize hash headers
+ int htHeaderCount = getHeaderPages(tableSize, frameSize);
+
+ isPartitionOnly = false;
+ if (numOfPartitions >= framesLimit - htHeaderCount) {
+ isPartitionOnly = true;
+ }
+
+ if (isPartitionOnly) {
+ htHeaderCount = 0;
+ }
+
+ headers = new ByteBuffer[htHeaderCount];
+
+ // initialize hash table contents
+ contents = new ByteBuffer[framesLimit - htHeaderCount - numOfPartitions];
+ currentHashTableFrame = 0;
+ isHashtableFull = false;
+
+ // initialize hash table aggregate state
+ htAggregateState = aggregator.createAggregateStates();
+
+ // initialize partition information
+ spilledPartOutputBuffers = new ByteBuffer[numOfPartitions];
+ spilledPartRunWriters = new RunFileWriter[numOfPartitions];
+ spilledPartRunSizeArrayInFrames = new int[numOfPartitions];
+ spilledPartRunSizeArrayInTuples = new int[numOfPartitions];
+
+ // initialize other helper classes
+ inputFrameTupleAccessor = new FrameTupleAccessor(frameSize, inputRecordDescriptor);
+ internalTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
+ spilledPartInsertAppender = new FrameTupleAppender(frameSize);
+
+ htInsertAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
+ matchPointer = new TuplePointer();
+ hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outputRecordDescriptor);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputFrameTupleAccessor.reset(buffer);
+ int tupleCount = inputFrameTupleAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ insert(inputFrameTupleAccessor, i);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (spilledPartRunWriters[i] != null) {
+ spilledPartRunWriters[i].close();
+ }
+ }
+ htAggregateState.close();
+ }
+
+ private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
+
+ if (isPartitionOnly) {
+ // for partition only
+ int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
+ insertSpilledPartition(accessor, tupleIndex, pid);
+ spilledPartRunSizeArrayInTuples[pid]++;
+ return;
+ }
+
+ int hid = hashComputer.partition(accessor, tupleIndex, tableSize);
+
+ if (findMatch(hid, accessor, tupleIndex)) {
+ // found a matching: do aggregation
+ hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+ aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex,
+ htAggregateState);
+ hashedRawRecords++;
+ } else {
+ if (isHashtableFull) {
+ // when hash table is full: spill the record
+ int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
+ insertSpilledPartition(accessor, tupleIndex, pid);
+ spilledPartRunSizeArrayInTuples[pid]++;
+ } else {
+ // insert a new entry into the hash table
+ internalTupleBuilder.reset();
+ for (int k = 0; k < inputKeys.length; k++) {
+ internalTupleBuilder.addField(accessor, tupleIndex, inputKeys[k]);
+ }
+
+ aggregator.init(internalTupleBuilder, accessor, tupleIndex, htAggregateState);
+
+ if (contents[currentHashTableFrame] == null) {
+ contents[currentHashTableFrame] = ctx.allocateFrame();
+ }
+
+ htInsertAppender.reset(contents[currentHashTableFrame], false);
+ if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ // hash table is full: try to allocate more frame
+ currentHashTableFrame++;
+ if (currentHashTableFrame >= contents.length) {
+ // no more frame to allocate: stop expending the hash table
+ isHashtableFull = true;
+
+ // reinsert the record
+ insert(accessor, tupleIndex);
+
+ return;
+ } else {
+ if (contents[currentHashTableFrame] == null) {
+ contents[currentHashTableFrame] = ctx.allocateFrame();
+ }
+
+ htInsertAppender.reset(contents[currentHashTableFrame], true);
+
+ if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
+ internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "Failed to insert an aggregation partial result into the in-memory hash table: it has the length of "
+ + internalTupleBuilder.getSize() + " and fields "
+ + internalTupleBuilder.getFieldEndOffsets().length);
+ }
+
+ }
+ }
+
+ // update hash table reference
+ if (matchPointer.frameIndex < 0) {
+ // need to initialize the hash table header
+ int headerFrameIndex = getHeaderFrameIndex(hid);
+ int headerFrameOffset = getHeaderTupleIndex(hid);
+
+ if (headers[headerFrameIndex] == null) {
+ headers[headerFrameIndex] = ctx.allocateFrame();
+ resetHeader(headerFrameIndex);
+ }
+
+ headers[headerFrameIndex].putInt(headerFrameOffset, currentHashTableFrame);
+ headers[headerFrameIndex]
+ .putInt(headerFrameOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
+ } else {
+ // update the previous reference
+ hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
+ int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
+ contents[matchPointer.frameIndex].putInt(refOffset, currentHashTableFrame);
+ contents[matchPointer.frameIndex]
+ .putInt(refOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
+ }
+
+ hashedKeys++;
+ hashedRawRecords++;
+ }
+ }
+ }
+
+ /**
+ * Insert record into a spilled partition, by directly copying the tuple into the output buffer.
+ *
+ * @param accessor
+ * @param tupleIndex
+ * @param pid
+ */
+ private void insertSpilledPartition(FrameTupleAccessor accessor, int tupleIndex, int pid)
+ throws HyracksDataException {
+
+ if (spilledPartOutputBuffers[pid] == null) {
+ spilledPartOutputBuffers[pid] = ctx.allocateFrame();
+ }
+
+ spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], false);
+
+ if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
+ // the output buffer is full: flush
+ flushSpilledPartitionOutputBuffer(pid);
+ // reset the output buffer
+ spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], true);
+
+ if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
+ throw new HyracksDataException("Failed to insert a record into a spilled partition!");
+ }
+ }
+
+ }
+
+ /**
+ * Flush a spilled partition's output buffer.
+ *
+ * @param pid
+ * @throws HyracksDataException
+ */
+ private void flushSpilledPartitionOutputBuffer(int pid) throws HyracksDataException {
+ if (spilledPartRunWriters[pid] == null) {
+ spilledPartRunWriters[pid] = new RunFileWriter(
+ ctx.createManagedWorkspaceFile("HashHashPrePartitionHashTable"), ctx.getIOManager());
+ spilledPartRunWriters[pid].open();
+ }
+
+ FrameUtils.flushFrame(spilledPartOutputBuffers[pid], spilledPartRunWriters[pid]);
+
+ spilledPartRunSizeArrayInFrames[pid]++;
+ }
+
+ /**
+ * Hash table lookup
+ *
+ * @param hid
+ * @param accessor
+ * @param tupleIndex
+ * @return
+ */
+ private boolean findMatch(int hid, FrameTupleAccessor accessor, int tupleIndex) {
+
+ matchPointer.frameIndex = -1;
+ matchPointer.tupleIndex = -1;
+
+ // get reference in the header
+ int headerFrameIndex = getHeaderFrameIndex(hid);
+ int headerFrameOffset = getHeaderTupleIndex(hid);
+
+ if (headers[headerFrameIndex] == null) {
+ return false;
+ }
+
+ // initialize the pointer to the first record
+ int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
+ int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
+
+ while (entryFrameIndex >= 0) {
+ matchPointer.frameIndex = entryFrameIndex;
+ matchPointer.tupleIndex = entryTupleIndex;
+ hashtableRecordAccessor.reset(contents[entryFrameIndex]);
+ if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
+ return true;
+ }
+ // Move to the next record in this entry following the linked list
+ int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
+ int prevFrameIndex = entryFrameIndex;
+ entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
+ entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
+ }
+
+ return false;
+ }
+
+ public void finishup() throws HyracksDataException {
+ // spill all output buffers
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (spilledPartOutputBuffers[i] != null) {
+ flushSpilledPartitionOutputBuffer(i);
+ }
+ }
+ spilledPartOutputBuffers = null;
+
+ // flush in-memory aggregation results: no more frame cost here as all output buffers are recycled
+ ByteBuffer outputBuffer = ctx.allocateFrame();
+ FrameTupleAppender outputBufferAppender = new FrameTupleAppender(frameSize);
+ outputBufferAppender.reset(outputBuffer, true);
+
+ ArrayTupleBuilder outFlushTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
+
+ for (ByteBuffer htFrame : contents) {
+ if (htFrame == null) {
+ continue;
+ }
+ hashtableRecordAccessor.reset(htFrame);
+ int tupleCount = hashtableRecordAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ outFlushTupleBuilder.reset();
+
+ for (int k = 0; k < internalKeys.length; k++) {
+ outFlushTupleBuilder.addField(hashtableRecordAccessor, i, internalKeys[k]);
+ }
+
+ aggregator.outputFinalResult(outFlushTupleBuilder, hashtableRecordAccessor, i, htAggregateState);
+
+ if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
+ outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, outputWriter);
+ outputBufferAppender.reset(outputBuffer, true);
+
+ if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
+ outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
+ throw new HyracksDataException(
+ "Failed to flush a record from in-memory hash table: record has length of "
+ + outFlushTupleBuilder.getSize() + " and fields "
+ + outFlushTupleBuilder.getFieldEndOffsets().length);
+ }
+ }
+ }
+ }
+
+ if (outputBufferAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, outputWriter);
+ }
+
+ // create run readers and statistic information for spilled runs
+ spilledPartRunReaders = new LinkedList<IFrameReader>();
+ spilledRunAggregatedPages = new LinkedList<Integer>();
+ spilledPartRunSizesInFrames = new LinkedList<Integer>();
+ spilledPartRunSizesInTuples = new LinkedList<Integer>();
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (spilledPartRunWriters[i] != null) {
+ spilledPartRunReaders.add(spilledPartRunWriters[i].createReader());
+ spilledRunAggregatedPages.add(0);
+ spilledPartRunWriters[i].close();
+ spilledPartRunSizesInFrames.add(spilledPartRunSizeArrayInFrames[i]);
+ spilledPartRunSizesInTuples.add(spilledPartRunSizeArrayInTuples[i]);
+ }
+ }
+ }
+
+ /**
+ * Compare an input record with a hash table entry.
+ *
+ * @param accessor
+ * @param tupleIndex
+ * @param hashAccessor
+ * @param hashTupleIndex
+ * @return
+ */
+ private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
+ int hashTupleIndex) {
+ int tStart0 = accessor.getTupleStartOffset(tupleIndex);
+ int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
+ int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < internalKeys.length; ++i) {
+ int fStart0 = accessor.getFieldStartOffset(tupleIndex, inputKeys[i]);
+ int fEnd0 = accessor.getFieldEndOffset(tupleIndex, inputKeys[i]);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
+ int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Get the header frame index of the given hash table entry
+ *
+ * @param entry
+ * @return
+ */
+ private int getHeaderFrameIndex(int entry) {
+ int frameIndex = (entry / frameSize * 2 * INT_SIZE) + (entry % frameSize * 2 * INT_SIZE / frameSize);
+ return frameIndex;
+ }
+
+ /**
+ * Get the tuple index of the given hash table entry
+ *
+ * @param entry
+ * @return
+ */
+ private int getHeaderTupleIndex(int entry) {
+ int offset = (entry % frameSize) * 2 * INT_SIZE % frameSize;
+ return offset;
+ }
+
+ /**
+ * reset the header page.
+ *
+ * @param headerFrameIndex
+ */
+ private void resetHeader(int headerFrameIndex) {
+ for (int i = 0; i < frameSize; i += INT_SIZE) {
+ headers[headerFrameIndex].putInt(i, HEADER_REF_EMPTY);
+ }
+ }
+
+ public List<Integer> getSpilledRunsSizeInTuples() throws HyracksDataException {
+ return spilledPartRunSizesInTuples;
+ }
+
+ public int getHashedUniqueKeys() throws HyracksDataException {
+ return hashedKeys;
+ }
+
+ public int getHashedRawRecords() throws HyracksDataException {
+ return hashedRawRecords;
+ }
+
+ public List<Integer> getSpilledRunsAggregatedPages() throws HyracksDataException {
+ return spilledRunAggregatedPages;
+ }
+
+ public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
+ return spilledPartRunReaders;
+ }
+
+ public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
+ return spilledPartRunSizesInFrames;
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
new file mode 100644
index 0000000..8a94c9f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortGroupHashTable;
+import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortRunMerger;
+
+public class HybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double HYBRID_FALLBACK_THRESHOLD = 0.8;
+
+ // merge with fudge factor
+ private static final double ESTIMATOR_MAGNIFIER = 1.2;
+
+ // input key fields
+ private final int[] keyFields;
+
+ // intermediate and final key fields
+ private final int[] storedKeyFields;
+
+ /**
+ * Input sizes as the count of the raw records.
+ */
+ private final long inputSizeInRawRecords;
+
+ /**
+ * Input size as the count of the unique keys.
+ */
+ private final long inputSizeInUniqueKeys;
+
+ // hash table size
+ private final int tableSize;
+
+ // estimated record size: used for compute the fudge factor
+ private final int userProvidedRecordSizeInBytes;
+
+ // aggregator
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+
+ // merger, in case of falling back to the hash-sort algorithm for hash skewness
+ private final IAggregatorDescriptorFactory mergerFactory;
+
+ // for the sort fall-back algorithm
+ private final INormalizedKeyComputerFactory firstNormalizerFactory;
+
+ // total memory in pages
+ private final int framesLimit;
+
+ // comparator factories for key fields.
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ /**
+ * hash families for each field: a hash function family is need as we may have
+ * more than one levels of hashing
+ */
+ private final IBinaryHashFunctionFamily[] hashFamilies;
+
+ /**
+ * Flag for input adjustment
+ */
+ private final boolean doInputAdjustment;
+
+ private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
+
+ public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
+ IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
+ int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor outRecDesc) throws HyracksDataException {
+ this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
+ mergerFactory, outRecDesc, true);
+ }
+
+ public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
+ IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
+ int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
+ IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+ RecordDescriptor outRecDesc, boolean doInputAdjustment) throws HyracksDataException {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ this.tableSize = tableSize;
+ this.userProvidedRecordSizeInBytes = recordSizeInBytes;
+
+ this.inputSizeInRawRecords = inputSizeInRawRecords;
+ this.inputSizeInUniqueKeys = inputSizeInUniqueKeys;
+
+ if (framesLimit <= 3) {
+ // at least 3 frames: 2 for in-memory hash table, and 1 for output buffer
+ throw new HyracksDataException(
+ "Not enough memory for Hash-Hash Aggregation algorithm: at least 3 frames are necessary, but only "
+ + framesLimit + " available.");
+ }
+
+ this.keyFields = keyFields;
+ storedKeyFields = new int[keyFields.length];
+ for (int i = 0; i < storedKeyFields.length; i++) {
+ storedKeyFields[i] = i;
+ }
+
+ this.aggregatorFactory = aggregatorFactory;
+
+ this.mergerFactory = mergerFactory;
+ this.firstNormalizerFactory = firstNormalizerFactory;
+
+ this.comparatorFactories = comparatorFactories;
+
+ this.hashFamilies = hashFamilies;
+
+ recordDescriptors[0] = outRecDesc;
+
+ this.doInputAdjustment = doInputAdjustment;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparators.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+
+ final int frameSize = ctx.getFrameSize();
+
+ final double fudgeFactor = HybridHashGroupHashTable.getHashtableOverheadRatio(tableSize, frameSize,
+ framesLimit, userProvidedRecordSizeInBytes) * FUDGE_FACTOR_ESTIMATION;
+
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ HybridHashGroupHashTable topProcessor;
+
+ int observedInputSizeInFrames;
+
+ int userProvidedInputSizeInFrames;
+
+ boolean topLevelFallbackCheck = true;
+
+ ITuplePartitionComputerFamily tpcf = new FieldHashPartitionComputerFamily(keyFields, hashFamilies);
+
+ ITuplePartitionComputerFamily tpcfMerge = new FieldHashPartitionComputerFamily(storedKeyFields,
+ hashFamilies);
+
+ ByteBuffer readAheadBuf;
+
+ /**
+ * Compute the partition numbers using hybrid-hash formula.
+ *
+ * @param tableSize
+ * @param framesLimit
+ * @param inputKeySize
+ * @param partitionInOperator
+ * @param factor
+ * @return
+ */
+ private int getNumberOfPartitions(int tableSize, int framesLimit, int inputKeySize, double factor) {
+
+ int hashtableHeaderPages = HybridHashGroupHashTable.getHeaderPages(tableSize, frameSize);
+
+ int numberOfPartitions = HybridHashUtil.hybridHashPartitionComputer((int) Math.ceil(inputKeySize),
+ framesLimit, factor);
+
+ // if the partition number is more than the available hash table contents, do pure partition.
+ if (numberOfPartitions >= framesLimit - hashtableHeaderPages) {
+ numberOfPartitions = framesLimit;
+ }
+
+ if (numberOfPartitions <= 0) {
+ numberOfPartitions = 1;
+ }
+
+ return numberOfPartitions;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ observedInputSizeInFrames = 0;
+
+ // estimate the number of unique keys for this partition, given the total raw record count and unique record count
+ long estimatedNumberOfUniqueKeys = HybridHashUtil.getEstimatedPartitionSizeOfUniqueKeys(
+ inputSizeInRawRecords, inputSizeInUniqueKeys, 1);
+
+ userProvidedInputSizeInFrames = (int) Math.ceil(estimatedNumberOfUniqueKeys
+ * userProvidedRecordSizeInBytes / frameSize);
+
+ int topPartitions = getNumberOfPartitions(tableSize, framesLimit,
+ (int) Math.ceil(userProvidedInputSizeInFrames * ESTIMATOR_MAGNIFIER), fudgeFactor);
+
+ topProcessor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize, topPartitions, keyFields, 0,
+ comparators, tpcf, aggregatorFactory.createAggregator(ctx, inRecDesc, recordDescriptors[0],
+ keyFields, storedKeyFields), inRecDesc, recordDescriptors[0], writer);
+
+ writer.open();
+ topProcessor.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ observedInputSizeInFrames++;
+ topProcessor.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ topProcessor.finishup();
+
+ List<IFrameReader> runs = topProcessor.getSpilledRuns();
+ List<Integer> runsSizeInFrames = topProcessor.getSpilledRunsSizeInPages();
+
+ // get statistics from the hash table
+ int hashedKeys = topProcessor.getHashedUniqueKeys();
+ int hashedRawRecords = topProcessor.getHashedRawRecords();
+
+ // Get the raw record size of each partition (in records but not in pages)
+ List<Integer> partitionRawRecordsCount = topProcessor.getSpilledRunsSizeInTuples();
+
+ topProcessor.close();
+
+ // get a new estimation on the number of keys in the input data set: if the previous level is pure-partition,
+ // then use the size inputed in the previous level; otherwise, compute the key ratio in the data set based on
+ // the processed keys.
+ int newKeySizeInPages = (doInputAdjustment && hashedRawRecords > 0) ? (int) Math
+ .ceil((double) hashedKeys / hashedRawRecords * observedInputSizeInFrames) : (int) Math
+ .ceil(userProvidedInputSizeInFrames);
+
+ IFrameReader runReader;
+ int runSizeInFrames;
+ int partitionRawRecords;
+
+ while (!runs.isEmpty()) {
+
+ runReader = runs.remove(0);
+ runSizeInFrames = runsSizeInFrames.remove(0);
+ partitionRawRecords = partitionRawRecordsCount.remove(0);
+
+ // compute the estimated key size in frames for the run file
+ int runKeySize;
+
+ if (doInputAdjustment && hashedRawRecords > 0)
+ runKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInFrames
+ / observedInputSizeInFrames);
+ else
+ runKeySize = (int) Math.ceil((double) userProvidedInputSizeInFrames * partitionRawRecords
+ / inputSizeInRawRecords);
+
+ if (topLevelFallbackCheck && runKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
+ fallBack(runReader, runSizeInFrames, runKeySize, 1);
+ } else {
+ processRunFiles(runReader, runKeySize, 1);
+ }
+ }
+
+ writer.close();
+
+ }
+
+ private void processRunFiles(IFrameReader runReader, int uniqueKeysOfRunFileInFrames, int runLevel)
+ throws HyracksDataException {
+
+ boolean checkFallback = true;
+
+ int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, uniqueKeysOfRunFileInFrames,
+ fudgeFactor);
+
+ HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
+ numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
+ ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+ recordDescriptors[0], writer);
+
+ processor.open();
+
+ runReader.open();
+
+ int inputRunRawSizeInFrames = 0, inputRunRawSizeInTuples = 0;
+
+ if (readAheadBuf == null) {
+ readAheadBuf = ctx.allocateFrame();
+ }
+ while (runReader.nextFrame(readAheadBuf)) {
+ inputRunRawSizeInFrames++;
+ inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
+ processor.nextFrame(readAheadBuf);
+ }
+
+ runReader.close();
+
+ processor.finishup();
+
+ List<IFrameReader> runs = processor.getSpilledRuns();
+ List<Integer> runSizes = processor.getSpilledRunsSizeInPages();
+ List<Integer> partitionRawRecords = processor.getSpilledRunsSizeInTuples();
+
+ int directFlushKeysInTuples = processor.getHashedUniqueKeys();
+ int directFlushRawRecordsInTuples = processor.getHashedRawRecords();
+
+ processor.close();
+
+ int newKeySizeInPages = (doInputAdjustment && directFlushRawRecordsInTuples > 0) ? (int) Math
+ .ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
+ * inputRunRawSizeInFrames) : uniqueKeysOfRunFileInFrames;
+
+ IFrameReader recurRunReader;
+ int runSizeInPages, subPartitionRawRecords;
+
+ while (!runs.isEmpty()) {
+ recurRunReader = runs.remove(0);
+ runSizeInPages = runSizes.remove(0);
+ subPartitionRawRecords = partitionRawRecords.remove(0);
+
+ int newRunKeySize;
+
+ if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
+ // do adjustment
+ newRunKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInPages
+ / inputRunRawSizeInFrames);
+ } else {
+ // no adjustment
+ newRunKeySize = (int) Math.ceil((double) subPartitionRawRecords * uniqueKeysOfRunFileInFrames
+ / inputRunRawSizeInTuples);
+ }
+
+ if (checkFallback && newRunKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
+ fallBack(recurRunReader, runSizeInPages, newRunKeySize, runLevel);
+ } else {
+ processRunFiles(recurRunReader, newRunKeySize, runLevel + 1);
+ }
+
+ }
+ }
+
+ private void fallBack(IFrameReader recurRunReader, int runSizeInPages, int runKeySizeInPages, int runLevel)
+ throws HyracksDataException {
+ fallbackHashSortAlgorithm(recurRunReader, runLevel + 1);
+ }
+
+ private void fallbackHashSortAlgorithm(IFrameReader recurRunReader, int runLevel)
+ throws HyracksDataException {
+ // fall back
+ FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
+ HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
+ keyFields, comparators, tpcf.createPartitioner(runLevel + 1),
+ firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(ctx,
+ inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
+ recordDescriptors[0]);
+
+ recurRunReader.open();
+ if (readAheadBuf == null) {
+ readAheadBuf = ctx.allocateFrame();
+ }
+ while (recurRunReader.nextFrame(readAheadBuf)) {
+ runFrameTupleAccessor.reset(readAheadBuf);
+ int tupleCount = runFrameTupleAccessor.getTupleCount();
+ for (int j = 0; j < tupleCount; j++) {
+ hhsTable.insert(runFrameTupleAccessor, j);
+ }
+ }
+
+ recurRunReader.close();
+ hhsTable.finishup();
+
+ LinkedList<RunFileReader> hhsRuns = hhsTable.getRunFileReaders();
+
+ if (hhsRuns.isEmpty()) {
+ hhsTable.flushHashtableToOutput(writer);
+ hhsTable.close();
+ } else {
+ hhsTable.close();
+ HybridHashSortRunMerger hhsMerger = new HybridHashSortRunMerger(ctx, hhsRuns, storedKeyFields,
+ comparators, recordDescriptors[0], tpcfMerge.createPartitioner(runLevel + 1),
+ mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
+ storedKeyFields, storedKeyFields), framesLimit, tableSize, writer, false);
+ hhsMerger.process();
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
new file mode 100644
index 0000000..666c172
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class HybridHashPartitionGenerateFrameWriter implements IFrameWriter {
+ private final IHyracksTaskContext ctx;
+
+ private RunFileWriter[] partitions;
+
+ private int[] partitionRawSizesInFrames;
+ private int[] partitionRawSizesInTuples;
+
+ private List<IFrameReader> partitionRunReaders;
+ private List<Integer> partitionRunAggregatedPages;
+ private List<Integer> partitionSizeInFrames;
+ private List<Integer> partitionSizeInTuples;
+
+ private ByteBuffer[] outputBuffers;
+
+ private final int numOfPartitions;
+
+ private final ITuplePartitionComputer tpc;
+
+ private final FrameTupleAccessor inFrameTupleAccessor;
+
+ private final FrameTupleAppender outFrameTupleAppender;
+
+ public HybridHashPartitionGenerateFrameWriter(IHyracksTaskContext ctx, int numOfPartitions,
+ ITuplePartitionComputer tpc, RecordDescriptor inRecDesc) {
+ this.ctx = ctx;
+ this.numOfPartitions = numOfPartitions;
+ this.tpc = tpc;
+ this.partitions = new RunFileWriter[numOfPartitions];
+ this.outputBuffers = new ByteBuffer[numOfPartitions];
+ this.partitionRawSizesInTuples = new int[numOfPartitions];
+ this.partitionRawSizesInFrames = new int[numOfPartitions];
+ this.inFrameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
+ this.outFrameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#open()
+ */
+ @Override
+ public void open() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
+ */
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameTupleAccessor.reset(buffer);
+ int tupleCount = inFrameTupleAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ int pid = tpc.partition(inFrameTupleAccessor, i, numOfPartitions);
+
+ ctx.getCounterContext().getCounter("optional.partition.insert.count", true).update(1);
+
+ if (outputBuffers[pid] == null) {
+ outputBuffers[pid] = ctx.allocateFrame();
+ }
+ outFrameTupleAppender.reset(outputBuffers[pid], false);
+
+ if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
+ // flush the output buffer
+ if (partitions[pid] == null) {
+ partitions[pid] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+ HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
+ partitions[pid].open();
+ }
+ FrameUtils.flushFrame(outputBuffers[pid], partitions[pid]);
+ partitionRawSizesInFrames[pid]++;
+ outFrameTupleAppender.reset(outputBuffers[pid], true);
+ if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
+ throw new HyracksDataException(
+ "Failed to insert a record into its partition: the record size is too large. ");
+ }
+ }
+ partitionRawSizesInTuples[pid]++;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#fail()
+ */
+ @Override
+ public void fail() throws HyracksDataException {
+ throw new HyracksDataException("Failed on hash partitioning.");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#close()
+ */
+ @Override
+ public void close() throws HyracksDataException {
+ outputBuffers = null;
+ for (RunFileWriter partWriter : partitions) {
+ if (partWriter != null)
+ partWriter.close();
+ }
+ }
+
+ public void finishup() throws HyracksDataException {
+ for (int i = 0; i < outputBuffers.length; i++) {
+ if (outputBuffers[i] == null) {
+ continue;
+ }
+ if (partitions[i] == null) {
+ partitions[i] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
+ HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
+ partitions[i].open();
+ }
+ outFrameTupleAppender.reset(outputBuffers[i], false);
+ if (outFrameTupleAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffers[i], partitions[i]);
+ partitionRawSizesInFrames[i]++;
+ outputBuffers[i] = null;
+ }
+ }
+
+ partitionRunReaders = new LinkedList<IFrameReader>();
+ partitionSizeInFrames = new LinkedList<Integer>();
+ partitionSizeInTuples = new LinkedList<Integer>();
+ partitionRunAggregatedPages = new LinkedList<Integer>();
+
+ for (int i = 0; i < numOfPartitions; i++) {
+ if (partitions[i] != null) {
+ partitionRunReaders.add(partitions[i].createReader());
+ partitionRunAggregatedPages.add(0);
+ partitions[i].close();
+ partitionSizeInFrames.add(partitionRawSizesInFrames[i]);
+ partitionSizeInTuples.add(partitionRawSizesInTuples[i]);
+ }
+ }
+
+ }
+
+ public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
+ return partitionRunReaders;
+ }
+
+ public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
+ return partitionSizeInFrames;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
new file mode 100644
index 0000000..5323887
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
+
+public class HybridHashUtil {
+
+ /**
+ * Compute the expected number of spilling partitions (in-memory partition is not included), using the hybrid-hash
+ * algorithm from [Shapiro86]. Note that 0 means that there is no need to have spilling partitions.
+ *
+ * @param inputSizeInFrames
+ * @param memorySizeInFrames
+ * @param fudgeFactor
+ * @return
+ */
+ public static int hybridHashPartitionComputer(int inputSizeOfUniqueKeysInFrames, int memorySizeInFrames,
+ double fudgeFactor) {
+ return Math.max(
+ (int) Math.ceil((inputSizeOfUniqueKeysInFrames * fudgeFactor - memorySizeInFrames)
+ / (memorySizeInFrames - 1)), 0);
+ }
+
+ /**
+ * Compute the estimated number of unique keys in a partition of a dataset, using Yao's formula
+ *
+ * @param inputSizeInRawRecords
+ * @param inputSizeInUniqueKeys
+ * @param numOfPartitions
+ * @return
+ */
+ public static long getEstimatedPartitionSizeOfUniqueKeys(long inputSizeInRawRecords, long inputSizeInUniqueKeys,
+ int numOfPartitions) {
+ if (numOfPartitions == 1) {
+ return inputSizeInUniqueKeys;
+ }
+ return (long) Math.ceil(inputSizeInUniqueKeys
+ * (1 - Math.pow(1 - ((double) inputSizeInRawRecords / (double) numOfPartitions)
+ / (double) inputSizeInRawRecords, (double) inputSizeInRawRecords
+ / (double) inputSizeInUniqueKeys)));
+ }
+}