Clean up extra files after merge... how were these even still around? Merge COMPLETE
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
deleted file mode 100644
index d63609e..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/GroupRunMergingFrameReader.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class GroupRunMergingFrameReader implements IFrameReader {
-
- private static final int INT_SIZE = 4;
-
- private final IHyracksTaskContext ctx;
- private final IFrameReader[] runCursors;
- private final List<ByteBuffer> inFrames;
- private final int[] keyFields;
- private final int framesLimit;
- private final int tableSize;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDesc;
- private final FrameTupleAppender outFrameAppender;
- private final ITuplePartitionComputer tpc;
- private ReferencedPriorityQueue topTuples;
- private int[] tupleIndexes;
- private int[] currentFrameIndexForRuns, bufferedFramesForRuns;
- private FrameTupleAccessor[] tupleAccessors;
- private int framesBuffered;
-
- private final IAggregatorDescriptor grouper;
- private final AggregateState groupState;
-
- private final boolean isLoadBuffered;
-
- private final boolean isFinalPhase;
-
- private final ArrayTupleBuilder groupTupleBuilder, outputTupleBuilder;
-
- private byte[] groupResultCache;
- private ByteBuffer groupResultCacheBuffer;
- private IFrameTupleAccessor groupResultCacheAccessor;
- private FrameTupleAppender groupResultCacheAppender;
-
- // FIXME
- long queueCompCounter = 0, mergeCompCounter = 0;
-
- public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
- int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
- IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase) {
- this(ctx, runCursors, framesLimit, tableSize, keyFields, tpc, comparators, grouper, recordDesc, isFinalPhase,
- false);
- }
-
- public GroupRunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, int framesLimit,
- int tableSize, int[] keyFields, ITuplePartitionComputer tpc, IBinaryComparator[] comparators,
- IAggregatorDescriptor grouper, RecordDescriptor recordDesc, boolean isFinalPhase, boolean isLoadBuffered) {
- this.ctx = ctx;
- this.runCursors = runCursors;
- this.inFrames = new ArrayList<ByteBuffer>();
- this.keyFields = keyFields;
- this.tableSize = tableSize;
- this.comparators = comparators;
- this.recordDesc = recordDesc;
- this.grouper = grouper;
- this.groupState = grouper.createAggregateStates();
- this.outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- this.isLoadBuffered = isLoadBuffered;
- this.isFinalPhase = isFinalPhase;
- this.framesLimit = framesLimit;
- this.tpc = tpc;
-
- this.groupTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
- this.outputTupleBuilder = new ArrayTupleBuilder(recordDesc.getFieldCount());
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameReader#open()
- */
- @Override
- public void open() throws HyracksDataException {
- if (isLoadBuffered) {
- while (inFrames.size() + 1 < framesLimit) {
- inFrames.add(ctx.allocateFrame());
- }
- framesBuffered = inFrames.size() / runCursors.length;
- } else {
- while (inFrames.size() < framesLimit - 1 && inFrames.size() < runCursors.length) {
- inFrames.add(ctx.allocateFrame());
- }
- framesBuffered = 1;
- }
- tupleAccessors = new FrameTupleAccessor[runCursors.length];
- currentFrameIndexForRuns = new int[runCursors.length];
- bufferedFramesForRuns = new int[runCursors.length];
- Comparator<ReferenceEntryWithBucketID> comparator = createEntryComparator(comparators);
- topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator);
- tupleIndexes = new int[runCursors.length];
-
- for (int i = 0; i < runCursors.length; i++) {
- int runIndex = topTuples.peek().getRunid();
- tupleIndexes[runIndex] = 0;
- runCursors[runIndex].open();
- for (int j = 0; j < framesBuffered; j++) {
-
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
-
- bufferedFramesForRuns[runIndex]++;
- if (j == 0) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex * framesBuffered + j));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
- }
- } else {
- break;
- }
- }
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameReader#nextFrame(java.nio.ByteBuffer)
- */
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- outFrameAppender.reset(buffer, true);
-
- while (!topTuples.areRunsExhausted()) {
- ReferenceEntryWithBucketID top = topTuples.peek();
- int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- int tupleIndex = top.getTupleIndex();
-
- // check whether we can do aggregation
- boolean needInsert = true;
- if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
- groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
- if (compareFrameTuples(fta, tupleIndex, groupResultCacheAccessor, 0) == 0) {
- needInsert = false;
- }
- }
-
- if (needInsert) {
-
- // try to flush the group cache into the output buffer, if any
- if (groupResultCacheAccessor != null && groupResultCacheAccessor.getFieldCount() > 0) {
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
- }
- if (isFinalPhase) {
- grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
- } else {
- grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
- }
-
- // return if the buffer is full
- if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- return true;
- }
- groupResultCacheBuffer.putInt(groupResultCache.length - 4, 0);
- }
-
- groupTupleBuilder.reset();
- for (int k : keyFields) {
- groupTupleBuilder.addField(fta, tupleIndex, k);
- }
- grouper.init(groupTupleBuilder, fta, tupleIndex, groupState);
-
- // enlarge the cache buffer if necessary
- int requiredSize = groupTupleBuilder.getSize() + groupTupleBuilder.getFieldEndOffsets().length
- * INT_SIZE + 2 * INT_SIZE;
-
- if (groupResultCache == null || groupResultCache.length < requiredSize) {
- groupResultCache = new byte[requiredSize];
- groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
- groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
- groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, recordDesc);
- }
-
- // always reset the group cache
- groupResultCacheAppender.reset(groupResultCacheBuffer, true);
- if (!groupResultCacheAppender.append(groupTupleBuilder.getFieldEndOffsets(),
- groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
- throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
- }
-
- groupResultCacheAccessor.reset(groupResultCacheBuffer);
-
- } else {
- grouper.aggregate(fta, tupleIndex, groupResultCacheAccessor, 0, groupState);
- }
-
- ++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- }
-
- if (groupResultCacheAccessor != null && groupResultCacheAccessor.getTupleCount() > 0) {
- outputTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- outputTupleBuilder.addField(groupResultCacheAccessor, 0, k);
- }
- if (isFinalPhase) {
- grouper.outputFinalResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
- } else {
- grouper.outputPartialResult(outputTupleBuilder, groupResultCacheAccessor, 0, groupState);
- }
-
- // return if the buffer is full
- if (!outFrameAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(), 0,
- outputTupleBuilder.getSize())) {
- return true;
- }
-
- groupResultCacheAccessor = null;
- groupResultCache = null;
- groupResultCacheBuffer = null;
- groupResultCacheAppender = null;
- }
-
- if (outFrameAppender.getTupleCount() > 0) {
- return true;
- }
-
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameReader#close()
- */
- @Override
- public void close() throws HyracksDataException {
- for (int i = 0; i < runCursors.length; ++i) {
- closeRun(i, runCursors, tupleAccessors);
- }
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- if (exists) {
- int h = tpc.partition(tupleAccessors[runIndex], tupleIndexes[runIndex], tableSize);
- topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex], h);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- if (currentFrameIndexForRuns[runIndex] - runIndex * framesBuffered < bufferedFramesForRuns[runIndex] - 1) {
- currentFrameIndexForRuns[runIndex]++;
- } else {
- bufferedFramesForRuns[runIndex] = 0;
- for (int j = 0; j < framesBuffered; j++) {
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex * framesBuffered + j))) {
- bufferedFramesForRuns[runIndex]++;
- } else {
- break;
- }
- }
- currentFrameIndexForRuns[runIndex] = runIndex * framesBuffered;
- }
- if (bufferedFramesForRuns[runIndex] > 0) {
- tupleAccessors[runIndex].reset(inFrames.get(currentFrameIndexForRuns[runIndex]));
- tupleIndexes[runIndex] = 0;
- return true;
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
-
- private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
- throws HyracksDataException {
- if (runCursors[index] != null) {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessors[index] = null;
- }
- }
-
- private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
- mergeCompCounter++;
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
- int l2_start = fta2.getFieldStartOffset(j2, fIdx);
- int l2_end = fta2.getFieldEndOffset(j2, fIdx);
- int l2 = l2_end - l2_start;
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- private Comparator<ReferenceEntryWithBucketID> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntryWithBucketID>() {
- public int compare(ReferenceEntryWithBucketID tp1, ReferenceEntryWithBucketID tp2) {
-
- queueCompCounter++;
-
- int cmp = tp1.getBucketID() - tp2.getBucketID();
-
- if (cmp != 0) {
- return cmp;
- }
-
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = tp1.getTupleIndex();
- int j2 = tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = keyFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
-
- return cmp;
- }
- };
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
deleted file mode 100644
index 6e85cff..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
+++ /dev/null
@@ -1,686 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAccessorForGroupHashtable;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.FrameTupleAppenderForGroupHashtable;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HybridHashSortGroupHashTable {
-
- protected static final int INT_SIZE = 4;
- protected static final int INIT_REF_COUNT = 8;
- protected static final int PTR_SIZE = 3;
-
- protected final int tableSize, framesLimit, frameSize;
-
- protected final ByteBuffer[] headers;
- protected final ByteBuffer[] contents;
-
- protected final IHyracksTaskContext ctx;
-
- protected int currentLargestFrameIndex;
- protected int totalTupleCount;
-
- protected final IAggregatorDescriptor aggregator;
- protected final AggregateState aggState;
-
- protected final int[] keys, internalKeys;
-
- private final IBinaryComparator[] comparators;
-
- protected final ITuplePartitionComputer tpc;
-
- protected final INormalizedKeyComputer firstNormalizer;
-
- private ByteBuffer outputBuffer;
-
- private LinkedList<RunFileReader> runReaders;
-
- protected TuplePointer matchPointer;
-
- protected final FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
-
- private final FrameTupleAccessorForGroupHashtable compFrameAccessor1, compFrameAccessor2;
-
- protected final FrameTupleAppenderForGroupHashtable internalAppender;
-
- private final FrameTupleAppender outputAppender;
-
- /**
- * Tuple builder for hash table insertion
- */
- protected final ArrayTupleBuilder internalTupleBuilder, outputTupleBuilder;
-
- /**
- * pointers for sort records in an entry
- */
- protected int[] tPointers;
-
- protected int usedEntries = 0;
-
- protected long hashedKeys = 0, hashedRawRec = 0;
-
- public HybridHashSortGroupHashTable(IHyracksTaskContext ctx, int frameLimits, int tableSize, int[] keys,
- IBinaryComparator[] comparators, ITuplePartitionComputer tpc,
- INormalizedKeyComputer firstNormalizerComputer, IAggregatorDescriptor aggregator,
- RecordDescriptor inRecDesc, RecordDescriptor outRecDesc) {
- this.ctx = ctx;
- this.tableSize = tableSize;
- this.framesLimit = frameLimits;
- this.frameSize = ctx.getFrameSize();
-
- this.keys = keys;
- this.internalKeys = new int[keys.length];
- for (int i = 0; i < internalKeys.length; i++) {
- internalKeys[i] = i;
- }
-
- this.aggregator = aggregator;
- this.aggState = aggregator.createAggregateStates();
-
- this.tpc = tpc;
- this.comparators = comparators;
- this.firstNormalizer = firstNormalizerComputer;
-
- // initialize the hash table
- int residual = ((tableSize % frameSize) * INT_SIZE * 2) % frameSize == 0 ? 0 : 1;
- this.headers = new ByteBuffer[tableSize / frameSize * INT_SIZE * 2 + tableSize % frameSize * 2 * INT_SIZE
- / frameSize + residual];
-
- this.outputBuffer = ctx.allocateFrame();
-
- this.contents = new ByteBuffer[framesLimit - 1 - headers.length];
- this.currentLargestFrameIndex = -1;
- this.totalTupleCount = 0;
-
- this.runReaders = new LinkedList<RunFileReader>();
- this.hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
- this.compFrameAccessor1 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
- this.compFrameAccessor2 = new FrameTupleAccessorForGroupHashtable(frameSize, outRecDesc);
-
- this.internalTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
- this.outputTupleBuilder = new ArrayTupleBuilder(outRecDesc.getFieldCount());
- this.internalAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
- this.outputAppender = new FrameTupleAppender(frameSize);
-
- this.matchPointer = new TuplePointer();
-
- }
-
- /**
- * Reset the header page
- *
- * @param headerFrameIndex
- */
- protected void resetHeader(int headerFrameIndex) {
- for (int i = 0; i < frameSize; i += INT_SIZE) {
- headers[headerFrameIndex].putInt(i, -1);
- }
- }
-
- /**
- * Get the header frame index of the given hash table entry
- *
- * @param entry
- * @return
- */
- protected int getHeaderFrameIndex(int entry) {
- int frameIndex = entry / frameSize * 2 * INT_SIZE + entry % frameSize * 2 * INT_SIZE / frameSize;
- return frameIndex;
- }
-
- /**
- * Get the tuple index of the given hash table entry
- *
- * @param entry
- * @return
- */
- protected int getHeaderTupleIndex(int entry) {
- int offset = entry % frameSize * 2 * INT_SIZE % frameSize;
- return offset;
- }
-
- public void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
-
- int entry = tpc.partition(accessor, tupleIndex, tableSize);
-
- hashedRawRec++;
-
- if (findMatch(entry, accessor, tupleIndex)) {
- // find match; do aggregation
- hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
- aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex, aggState);
- } else {
-
- internalTupleBuilder.reset();
- for (int k = 0; k < keys.length; k++) {
- internalTupleBuilder.addField(accessor, tupleIndex, keys[k]);
- }
- aggregator.init(internalTupleBuilder, accessor, tupleIndex, aggState);
- int insertFrameIndex = -1, insertTupleIndex = -1;
- boolean inserted = false;
-
- if (currentLargestFrameIndex < 0) {
- currentLargestFrameIndex = 0;
- }
-
- if (contents[currentLargestFrameIndex] == null) {
- contents[currentLargestFrameIndex] = ctx.allocateFrame();
- }
-
- internalAppender.reset(contents[currentLargestFrameIndex], false);
- if (internalAppender.append(internalTupleBuilder.getFieldEndOffsets(), internalTupleBuilder.getByteArray(),
- 0, internalTupleBuilder.getSize())) {
- inserted = true;
- insertFrameIndex = currentLargestFrameIndex;
- insertTupleIndex = internalAppender.getTupleCount() - 1;
- }
-
- if (!inserted && currentLargestFrameIndex < contents.length - 1) {
- currentLargestFrameIndex++;
- if (contents[currentLargestFrameIndex] == null) {
- contents[currentLargestFrameIndex] = ctx.allocateFrame();
- }
- internalAppender.reset(contents[currentLargestFrameIndex], true);
- if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- throw new HyracksDataException("Failed to insert an aggregation value.");
- } else {
- insertFrameIndex = currentLargestFrameIndex;
- insertTupleIndex = internalAppender.getTupleCount() - 1;
- inserted = true;
- }
- }
-
- // memory is full
- if (!inserted) {
- // flush hash table and try to insert again
- flush();
-
- // update the match point to the header reference
- matchPointer.frameIndex = -1;
- matchPointer.tupleIndex = -1;
- // re-insert
- currentLargestFrameIndex++;
- if (contents[currentLargestFrameIndex] == null) {
- contents[currentLargestFrameIndex] = ctx.allocateFrame();
- }
- internalAppender.reset(contents[currentLargestFrameIndex], true);
- if (!internalAppender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- throw new HyracksDataException("Failed to insert an aggregation value.");
- } else {
- insertFrameIndex = currentLargestFrameIndex;
- insertTupleIndex = internalAppender.getTupleCount() - 1;
- }
- }
-
- // no match; new insertion
- if (matchPointer.frameIndex < 0) {
- // first record for this entry; update the header references
- int headerFrameIndex = getHeaderFrameIndex(entry);
- int headerFrameOffset = getHeaderTupleIndex(entry);
- if (headers[headerFrameIndex] == null) {
- headers[headerFrameIndex] = ctx.allocateFrame();
- resetHeader(headerFrameIndex);
- }
- headers[headerFrameIndex].putInt(headerFrameOffset, insertFrameIndex);
- headers[headerFrameIndex].putInt(headerFrameOffset + INT_SIZE, insertTupleIndex);
- usedEntries++;
-
- } else {
- // update the previous reference
- hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
- int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
- contents[matchPointer.frameIndex].putInt(refOffset, insertFrameIndex);
- contents[matchPointer.frameIndex].putInt(refOffset + INT_SIZE, insertTupleIndex);
- }
- hashedKeys++;
- totalTupleCount++;
- }
- }
-
- /**
- * Flush the hash table directly to the output
- */
- public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
-
- outputAppender.reset(outputBuffer, true);
- for (int i = 0; i < contents.length; i++) {
- if (contents[i] == null) {
- continue;
- }
- hashtableRecordAccessor.reset(contents[i]);
- int tupleCount = hashtableRecordAccessor.getTupleCount();
- for (int j = 0; j < tupleCount; j++) {
- outputTupleBuilder.reset();
-
- int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(j);
- int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
-
- for (int k = 0; k < internalKeys.length; k++) {
- outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
- + hashtableRecordAccessor.getFieldStartOffset(j, k),
- hashtableRecordAccessor.getFieldLength(j, k));
- }
-
- aggregator.outputFinalResult(outputTupleBuilder, hashtableRecordAccessor, j, aggState);
-
- if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
- 0, outputTupleBuilder.getSize())) {
-
- FrameUtils.flushFrame(outputBuffer, outputWriter);
-
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- throw new HyracksDataException("Failed to flush the hash table to the final output");
- }
- }
- }
- }
-
- if (outputAppender.getTupleCount() > 0) {
-
- FrameUtils.flushFrame(outputBuffer, outputWriter);
-
- outputAppender.reset(outputBuffer, true);
- }
-
- totalTupleCount = 0;
- usedEntries = 0;
- }
-
- /**
- * Flush hash table into a run file.
- *
- * @throws HyracksDataException
- */
- protected void flush() throws HyracksDataException {
-
- long methodTimer = System.nanoTime();
-
- FileReference runFile;
- try {
- runFile = ctx.getJobletContext().createManagedWorkspaceFile(
- HybridHashSortGroupHashTable.class.getSimpleName());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter runWriter = new RunFileWriter(runFile, ctx.getIOManager());
- runWriter.open();
- flushEntries(runWriter);
- runWriter.close();
- runReaders.add(runWriter.createReader());
- reset();
-
- ctx.getCounterContext()
- .getCounter("optional." + HybridHashSortGroupHashTable.class.getSimpleName() + ".flush.time", true)
- .update(System.nanoTime() - methodTimer);
- }
-
- private void flushEntries(IFrameWriter writer) throws HyracksDataException {
-
- outputAppender.reset(outputBuffer, true);
- for (int i = 0; i < tableSize; i++) {
- int tupleInEntry = sortEntry(i);
-
- for (int ptr = 0; ptr < tupleInEntry; ptr++) {
- int frameIndex = tPointers[ptr * PTR_SIZE];
- int tupleIndex = tPointers[ptr * PTR_SIZE + 1];
-
- hashtableRecordAccessor.reset(contents[frameIndex]);
- outputTupleBuilder.reset();
-
- int tupleOffset = hashtableRecordAccessor.getTupleStartOffset(tupleIndex);
- int fieldOffset = hashtableRecordAccessor.getFieldCount() * INT_SIZE;
-
- for (int k = 0; k < internalKeys.length; k++) {
- outputTupleBuilder.addField(hashtableRecordAccessor.getBuffer().array(), tupleOffset + fieldOffset
- + hashtableRecordAccessor.getFieldStartOffset(tupleIndex, k),
- hashtableRecordAccessor.getFieldLength(tupleIndex, k));
- }
-
- aggregator.outputPartialResult(outputTupleBuilder, hashtableRecordAccessor, tupleIndex, aggState);
-
- if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(), outputTupleBuilder.getByteArray(),
- 0, outputTupleBuilder.getSize())) {
-
- FrameUtils.flushFrame(outputBuffer, writer);
-
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(outputTupleBuilder.getFieldEndOffsets(),
- outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
- throw new HyracksDataException("Failed to flush an aggregation result.");
- }
- }
- totalTupleCount--;
- }
-
- if (tupleInEntry > 0) {
- usedEntries--;
- }
- }
-
- if (outputAppender.getTupleCount() > 0) {
-
- FrameUtils.flushFrame(outputBuffer, writer);
-
- outputAppender.reset(outputBuffer, true);
- }
- }
-
- protected int sortEntry(int entryID) {
-
- if (tPointers == null)
- tPointers = new int[INIT_REF_COUNT * PTR_SIZE];
- int ptr = 0;
-
- int headerFrameIndex = entryID / frameSize * 2 * INT_SIZE + (entryID % frameSize) * 2 * INT_SIZE / frameSize;
- int headerFrameOffset = (entryID % frameSize) * 2 * INT_SIZE % frameSize;
-
- if (headers[headerFrameIndex] == null) {
- return 0;
- }
-
- int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
- int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
- do {
- if (entryFrameIndex < 0) {
- break;
- }
- hashtableRecordAccessor.reset(contents[entryFrameIndex]);
- tPointers[ptr * PTR_SIZE] = entryFrameIndex;
- tPointers[ptr * PTR_SIZE + 1] = entryTupleIndex;
- int tStart = hashtableRecordAccessor.getTupleStartOffset(entryTupleIndex);
- int f0StartRel = hashtableRecordAccessor.getFieldStartOffset(entryTupleIndex, internalKeys[0]);
- int f0EndRel = hashtableRecordAccessor.getFieldEndOffset(entryTupleIndex, internalKeys[0]);
- int f0Start = f0StartRel + tStart + hashtableRecordAccessor.getFieldSlotsLength();
- tPointers[ptr * PTR_SIZE + 2] = firstNormalizer == null ? 0 : firstNormalizer.normalize(
- hashtableRecordAccessor.getBuffer().array(), f0Start, f0EndRel - f0StartRel);
-
- ptr++;
-
- if (ptr * PTR_SIZE >= tPointers.length) {
- int[] newTPointers = new int[tPointers.length * 2];
- System.arraycopy(tPointers, 0, newTPointers, 0, tPointers.length);
- tPointers = newTPointers;
- }
-
- // move to the next record
- int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
- int prevFrameIndex = entryFrameIndex;
- entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
- entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
-
- } while (true);
-
- // sort records
- if (ptr > 1) {
- sort(0, ptr);
- }
-
- return ptr;
- }
-
- protected void sort(int offset, int len) {
- int m = offset + (len >> 1);
- int mFrameIndex = tPointers[m * PTR_SIZE];
- int mTupleIndex = tPointers[m * PTR_SIZE + 1];
- int mNormKey = tPointers[m * PTR_SIZE + 2];
- compFrameAccessor1.reset(contents[mFrameIndex]);
-
- int a = offset;
- int b = a;
- int c = offset + len - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int bFrameIndex = tPointers[b * PTR_SIZE];
- int bTupleIndex = tPointers[b * PTR_SIZE + 1];
- int bNormKey = tPointers[b * PTR_SIZE + 2];
- int cmp = 0;
- if (bNormKey != mNormKey) {
- cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
- } else {
- compFrameAccessor2.reset(contents[bFrameIndex]);
- cmp = compare(compFrameAccessor2, bTupleIndex, compFrameAccessor1, mTupleIndex);
- }
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cFrameIndex = tPointers[c * PTR_SIZE];
- int cTupleIndex = tPointers[c * PTR_SIZE + 1];
- int cNormKey = tPointers[c * PTR_SIZE + 2];
- int cmp = 0;
- if (cNormKey != mNormKey) {
- cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
- } else {
- compFrameAccessor2.reset(contents[cFrameIndex]);
- cmp = compare(compFrameAccessor2, cTupleIndex, compFrameAccessor1, mTupleIndex);
- }
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(b++, c--);
- }
-
- int s;
- int n = offset + len;
- s = Math.min(a - offset, b - a);
- vecswap(offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(b, n - s, s);
-
- if ((s = b - a) > 1) {
- sort(offset, s);
- }
- if ((s = d - c) > 1) {
- sort(n - s, s);
- }
- }
-
- private void swap(int a, int b) {
- for (int i = 0; i < PTR_SIZE; i++) {
- int t = tPointers[a * PTR_SIZE + i];
- tPointers[a * PTR_SIZE + i] = tPointers[b * PTR_SIZE + i];
- tPointers[b * PTR_SIZE + i] = t;
- }
- }
-
- private void vecswap(int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(a, b);
- }
- }
-
- protected boolean findMatch(int entry, FrameTupleAccessor accessor, int tupleIndex) {
-
- // reset the match pointer
- matchPointer.frameIndex = -1;
- matchPointer.tupleIndex = -1;
-
- // get reference in the header
- int headerFrameIndex = getHeaderFrameIndex(entry);
- int headerFrameOffset = getHeaderTupleIndex(entry);
-
- if (headers[headerFrameIndex] == null) {
- return false;
- }
-
- // initialize the pointer to the first record
- int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
- int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
- while (entryFrameIndex >= 0) {
- matchPointer.frameIndex = entryFrameIndex;
- matchPointer.tupleIndex = entryTupleIndex;
- hashtableRecordAccessor.reset(contents[entryFrameIndex]);
-
- if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
- return true;
- }
- // Move to the next record in this entry following the linked list
- int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
- int prevFrameIndex = entryFrameIndex;
- entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
- entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
- }
-
- return false;
- }
-
- public LinkedList<RunFileReader> getRunFileReaders() {
- return runReaders;
- }
-
- private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
- int hashTupleIndex) {
- int tStart0 = accessor.getTupleStartOffset(tupleIndex);
- int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
-
- int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
- int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
-
- for (int i = 0; i < keys.length; ++i) {
- int fStart0 = accessor.getFieldStartOffset(tupleIndex, keys[i]);
- int fEnd0 = accessor.getFieldEndOffset(tupleIndex, keys[i]);
- int fLen0 = fEnd0 - fStart0;
-
- int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
- int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
- int fLen1 = fEnd1 - fStart1;
-
- int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- private int compare(FrameTupleAccessorForGroupHashtable accessor1, int tupleIndex1,
- FrameTupleAccessorForGroupHashtable accessor2, int tupleIndex2) {
- int tStart1 = accessor1.getTupleStartOffset(tupleIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
- int tStart2 = accessor2.getTupleStartOffset(tupleIndex2);
- int fStartOffset2 = accessor2.getFieldSlotsLength() + tStart2;
-
- for (int i = 0; i < internalKeys.length; ++i) {
- int fStart1 = accessor1.getFieldStartOffset(tupleIndex1, internalKeys[i]);
- int fEnd1 = accessor1.getFieldEndOffset(tupleIndex1, internalKeys[i]);
- int fLen1 = fEnd1 - fStart1;
-
- int fStart2 = accessor2.getFieldStartOffset(tupleIndex2, internalKeys[i]);
- int fEnd2 = accessor2.getFieldEndOffset(tupleIndex2, internalKeys[i]);
- int fLen2 = fEnd2 - fStart2;
-
- int c = comparators[i].compare(accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1, accessor2
- .getBuffer().array(), fStart2 + fStartOffset2, fLen2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- public void reset() {
- for (int i = 0; i < headers.length; i++) {
- if (headers[i] != null) {
- resetHeader(i);
- }
- }
- for (int i = 0; i < contents.length; i++) {
- if (contents[i] != null) {
- contents[i].putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
- }
- }
-
- usedEntries = 0;
- totalTupleCount = 0;
- currentLargestFrameIndex = -1;
- }
-
- public void finishup() throws HyracksDataException {
- if (runReaders.size() > 0) {
- flush();
- }
-
- hashedKeys = 0;
- hashedRawRec = 0;
- }
-
- /**
- * Close the hash table. Note that only memory allocated by frames are freed. Aggregation
- * states maintained in {@link #aggState} and run file readers in {@link #runReaders} should
- * be valid for later processing.
- */
- public void close() throws HyracksDataException {
- for (int i = 0; i < headers.length; i++) {
- headers[i] = null;
- }
- for (int i = 0; i < contents.length; i++) {
- contents[i] = null;
- }
- outputBuffer = null;
- tPointers = null;
- }
-
- public int getTupleCount() {
- return totalTupleCount;
- }
-
- public int getFrameSize() {
- return headers.length + contents.length + 1;
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
deleted file mode 100644
index 5296c9f..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-
-public class HybridHashSortGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-
- private static final int AGGREGATE_ACTIVITY_ID = 0;
-
- private static final int MERGE_ACTIVITY_ID = 1;
-
- private static final long serialVersionUID = 1L;
- private final int[] keyFields, storedKeyFields;
- private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
- private final IAggregatorDescriptorFactory aggregatorFactory;
- private final IAggregatorDescriptorFactory mergerFactory;
-
- private final ITuplePartitionComputerFactory aggTpcf, mergeTpcf;
-
- private final int framesLimit;
- private final IBinaryComparatorFactory[] comparatorFactories;
-
- private final int tableSize;
-
- private final boolean isLoadOptimized;
-
- public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
- ITuplePartitionComputerFactory mergeTpcf, IAggregatorDescriptorFactory aggregatorFactory,
- IAggregatorDescriptorFactory mergerFactory, RecordDescriptor recordDescriptor) {
- this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, null, aggregatorFactory,
- mergerFactory, recordDescriptor, false);
- }
-
- public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
- ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor recordDescriptor) {
- this(spec, keyFields, framesLimit, tableSize, comparatorFactories, aggTpcf, mergeTpcf, firstNormalizerFactory,
- aggregatorFactory, mergerFactory, recordDescriptor, false);
- }
-
- public HybridHashSortGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- int tableSize, IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory aggTpcf,
- ITuplePartitionComputerFactory mergeTpcf, INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor recordDescriptor, boolean isLoadOpt) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- if (framesLimit <= 2) {
- /**
- * Minimum of 3 frames: 2 for in-memory hash table, and 1 for output
- * aggregation results.
- */
- throw new IllegalStateException("frame limit should at least be 3, but it is " + framesLimit + "!");
- }
-
- storedKeyFields = new int[keyFields.length];
- for (int i = 0; i < storedKeyFields.length; i++) {
- storedKeyFields[i] = i;
- }
- this.aggregatorFactory = aggregatorFactory;
- this.mergerFactory = mergerFactory;
- this.keyFields = keyFields;
- this.comparatorFactories = comparatorFactories;
- this.firstNormalizerFactory = firstNormalizerFactory;
- this.aggTpcf = aggTpcf;
- this.mergeTpcf = mergeTpcf;
- this.tableSize = tableSize;
-
- /**
- * Set the record descriptor. Note that since this operator is a unary
- * operator, only the first record descriptor is used here.
- */
- recordDescriptors[0] = recordDescriptor;
-
- this.isLoadOptimized = isLoadOpt;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities(edu.uci.ics.hyracks.api.dataflow.
- * IActivityGraphBuilder)
- */
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
- MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
- builder.addActivity(this, aggregateAct);
- builder.addSourceEdge(0, aggregateAct, 0);
-
- builder.addActivity(this, mergeAct);
- builder.addTargetEdge(0, mergeAct, 0);
-
- builder.addBlockingEdge(aggregateAct, mergeAct);
- }
-
- public static class AggregateActivityState extends AbstractStateObject {
-
- private HybridHashSortGroupHashTable gTable;
-
- public AggregateActivityState() {
- }
-
- private AggregateActivityState(JobId jobId, TaskId tId) {
- super(jobId, tId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
- throw new UnsupportedOperationException();
- }
- }
-
- private class AggregateActivity extends AbstractActivityNode {
-
- private static final long serialVersionUID = 1L;
-
- public AggregateActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
-
- HybridHashSortGroupHashTable serializableGroupHashtable;
-
- FrameTupleAccessor accessor;
-
- @Override
- public void open() throws HyracksDataException {
-
- RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; i++) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- serializableGroupHashtable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
- keyFields, comparators, aggTpcf.createPartitioner(),
- firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(
- ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
- recordDescriptors[0]);
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- serializableGroupHashtable.insert(accessor, i);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- serializableGroupHashtable.finishup();
- AggregateActivityState state = new AggregateActivityState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
- state.gTable = serializableGroupHashtable;
- ctx.setStateObject(state);
- }
- };
- }
- }
-
- private class MergeActivity extends AbstractActivityNode {
-
- private static final long serialVersionUID = 1L;
-
- public MergeActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
-
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
- public void initialize() throws HyracksDataException {
-
- AggregateActivityState aggState = (AggregateActivityState) ctx.getStateObject(new TaskId(
- new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition));
-
- LinkedList<RunFileReader> runs = aggState.gTable.getRunFileReaders();
-
- writer.open();
- if (runs.size() <= 0) {
- aggState.gTable.flushHashtableToOutput(writer);
- aggState.gTable.close();
- } else {
- aggState.gTable.close();
-
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; i++) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- HybridHashSortRunMerger merger = new HybridHashSortRunMerger(ctx, runs, storedKeyFields,
- comparators, recordDescriptors[0], mergeTpcf.createPartitioner(),
- mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
- storedKeyFields, storedKeyFields), framesLimit, tableSize, writer,
- isLoadOptimized);
-
- merger.process();
- }
-
- writer.close();
- }
-
- };
- }
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
deleted file mode 100644
index 1de2237..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGrouperBucketMerge.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class HybridHashSortGrouperBucketMerge {
-
- private final int[] keyFields;
- private final IBinaryComparator[] comparators;
-
- private final IAggregatorDescriptor merger;
- private final AggregateState mergeState;
-
- private final int framesLimit, tableSize;
-
- private final RecordDescriptor inRecDesc;
-
- private final IHyracksTaskContext ctx;
-
- private final ArrayTupleBuilder tupleBuilder;
-
- private final IFrameWriter outputWriter;
-
- private final ITuplePartitionComputer tpc;
-
- private final boolean isLoadOptimized;
-
- List<ByteBuffer> inFrames;
- ByteBuffer outFrame, writerFrame;
- FrameTupleAppender outAppender, writerAppender;
- LinkedList<RunFileReader> runs;
- ArrayTupleBuilder finalTupleBuilder;
- FrameTupleAccessor outFrameAccessor;
- int[] currentFrameIndexInRun, currentRunFrames, currentBucketInRun;
- int runFrameLimit = 1;
-
- public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
- ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
- RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter)
- throws HyracksDataException {
- this.ctx = ctx;
- this.framesLimit = framesLimit;
- this.tableSize = tableSize;
-
- this.keyFields = keyFields;
- this.comparators = comparators;
- this.merger = merger;
- this.mergeState = merger.createAggregateStates();
-
- this.inRecDesc = inRecDesc;
-
- this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
-
- this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
-
- this.outputWriter = outputWriter;
-
- this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-
- this.tpc = tpc;
-
- this.isLoadOptimized = true;
- }
-
- public HybridHashSortGrouperBucketMerge(IHyracksTaskContext ctx, int[] keyFields, int framesLimit, int tableSize,
- ITuplePartitionComputer tpc, IBinaryComparator[] comparators, IAggregatorDescriptor merger,
- RecordDescriptor inRecDesc, RecordDescriptor outRecDesc, IFrameWriter outputWriter, boolean loadOptimized)
- throws HyracksDataException {
- this.ctx = ctx;
- this.framesLimit = framesLimit;
- this.tableSize = tableSize;
-
- this.keyFields = keyFields;
- this.comparators = comparators;
- this.merger = merger;
- this.mergeState = merger.createAggregateStates();
-
- this.inRecDesc = inRecDesc;
-
- this.tupleBuilder = new ArrayTupleBuilder(inRecDesc.getFieldCount());
-
- this.outAppender = new FrameTupleAppender(ctx.getFrameSize());
-
- this.outputWriter = outputWriter;
-
- this.outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-
- this.tpc = tpc;
-
- this.isLoadOptimized = loadOptimized;
- }
-
- public void initialize(LinkedList<RunFileReader> runFiles) throws HyracksDataException {
-
- runs = runFiles;
-
- try {
- if (runs.size() <= 0) {
- return;
- } else {
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outAppender.reset(outFrame, true);
- outFrameAccessor.reset(outFrame);
- int runProcOffset = 0;
- while (runs.size() > 0) {
- try {
- doPass(runs, runProcOffset);
- if (runs.size() + 2 <= framesLimit) {
- // final phase
- runProcOffset = 0;
- } else {
- // one more merge level
- runProcOffset++;
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- inFrames.clear();
- }
- } catch (Exception e) {
- outputWriter.fail();
- throw new HyracksDataException(e);
- } finally {
- mergeState.close();
- }
- }
-
- private void doPass(LinkedList<RunFileReader> runs, int offset) throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = outputWriter;
- boolean finalPass = false;
-
- int runNumber = runs.size() - offset;
-
- while (inFrames.size() + 2 < framesLimit) {
- inFrames.add(ctx.allocateFrame());
- }
-
- if (runNumber + 2 <= framesLimit) {
- finalPass = true;
- if (isLoadOptimized)
- runFrameLimit = (framesLimit - 2) / runNumber;
- else
- runFrameLimit = 1;
- } else {
- runFrameLimit = 1;
- runNumber = framesLimit - 2;
- newRun = ctx.getJobletContext().createManagedWorkspaceFile(
- HybridHashSortGrouperBucketMerge.class.getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- currentFrameIndexInRun = new int[runNumber];
- currentRunFrames = new int[runNumber];
- currentBucketInRun = new int[runNumber];
- /**
- * Create file readers for each input run file, only for
- * the ones fit into the inFrames
- */
- RunFileReader[] runFileReaders = new RunFileReader[runNumber];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceHashEntry> comparator = createEntryComparator(comparators);
- ReferencedBucketBasedPriorityQueue topTuples = new ReferencedBucketBasedPriorityQueue(ctx.getFrameSize(),
- inRecDesc, runNumber, comparator, tpc, tableSize);
- /**
- * current tuple index in each run
- */
- int[] tupleIndices = new int[runNumber];
-
- for (int i = 0; i < runNumber; i++) {
- int runIndex = i + offset;
- tupleIndices[i] = 0;
- // Load the run file
- runFileReaders[i] = runs.get(runIndex);
- runFileReaders[i].open();
-
- currentRunFrames[i] = 0;
- currentFrameIndexInRun[i] = i * runFrameLimit;
- for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[i] + j;
- boolean hasNextFrame = runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex));
- if (hasNextFrame) {
- tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
- currentRunFrames[i]++;
- if (j == 0) {
- currentBucketInRun[i] = tpc.partition(tupleAccessors[frameIndex], tupleIndices[i],
- tableSize);
- setNextTopTuple(i, tupleIndices, runFileReaders, tupleAccessors, topTuples);
- }
- } else {
- break;
- }
- }
- }
-
- /**
- * Start merging
- */
- while (!topTuples.areRunsExhausted()) {
- /**
- * Get the top record
- */
- ReferenceEntry top = topTuples.peek();
- int tupleIndex = top.getTupleIndex();
- int runIndex = topTuples.peek().getRunid();
-
- FrameTupleAccessor fta = top.getAccessor();
-
- int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
- if (currentTupleInOutFrame < 0
- || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
-
- tupleBuilder.reset();
-
- for (int k = 0; k < keyFields.length; k++) {
- tupleBuilder.addField(fta, tupleIndex, keyFields[k]);
- }
-
- merger.init(tupleBuilder, fta, tupleIndex, mergeState);
-
- if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- flushOutFrame(writer, finalPass);
- if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
- throw new HyracksDataException(
- "The partial result is too large to be initialized in a frame.");
- }
- }
-
- } else {
- /**
- * if new tuple is in the same group of the
- * current aggregator do merge and output to the
- * outFrame
- */
-
- merger.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, mergeState);
-
- }
- tupleIndices[runIndex]++;
- setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
- }
-
- if (outAppender.getTupleCount() > 0) {
- flushOutFrame(writer, finalPass);
- outAppender.reset(outFrame, true);
- }
-
- merger.close();
-
- runs.subList(offset, runNumber).clear();
- /**
- * insert the new run file into the beginning of the run
- * file list
- */
- if (!finalPass) {
- runs.add(offset, ((RunFileWriter) writer).createReader());
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- mergeState.reset();
- }
- }
-
- private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-
- if (finalTupleBuilder == null) {
- finalTupleBuilder = new ArrayTupleBuilder(inRecDesc.getFields().length);
- }
-
- if (writerFrame == null) {
- writerFrame = ctx.allocateFrame();
- }
-
- if (writerAppender == null) {
- writerAppender = new FrameTupleAppender(ctx.getFrameSize());
- }
- writerAppender.reset(writerFrame, true);
-
- outFrameAccessor.reset(outFrame);
-
- for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
- finalTupleBuilder.reset();
-
- for (int k = 0; k < keyFields.length; k++) {
- finalTupleBuilder.addField(outFrameAccessor, i, keyFields[k]);
- }
-
- if (isFinal) {
-
- merger.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
-
- } else {
-
- merger.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, mergeState);
- }
-
- if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
- if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
- finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
- throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
- }
- }
- }
- if (writerAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writerFrame, writer);
- writerAppender.reset(writerFrame, true);
- }
-
- outAppender.reset(outFrame, true);
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedBucketBasedPriorityQueue topTuples)
- throws HyracksDataException {
- int runStart = runIndex * runFrameLimit;
- boolean existNext = false;
- if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
- /**
- * run already closed
- */
- existNext = false;
- } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
- /**
- * not the last frame for this run
- */
- existNext = true;
- if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex]++;
- }
- } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
- /**
- * the last frame has expired
- */
- existNext = true;
- } else {
- /**
- * If all tuples in the targeting frame have been
- * checked.
- */
- tupleIndices[runIndex] = 0;
- currentFrameIndexInRun[runIndex] = runStart;
- /**
- * read in batch
- */
- currentRunFrames[runIndex] = 0;
- for (int j = 0; j < runFrameLimit; j++) {
- int frameIndex = currentFrameIndexInRun[runIndex] + j;
- if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
- tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
- existNext = true;
- currentRunFrames[runIndex]++;
- } else {
- break;
- }
- }
- }
-
- if (existNext) {
- topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- /**
- * Close the run file, and also the corresponding readers and
- * input frame.
- *
- * @param index
- * @param runCursors
- * @param tupleAccessor
- * @throws HyracksDataException
- */
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- if (runCursors[index] != null) {
- runCursors[index].close();
- runCursors[index] = null;
- int frameOffset = index * runFrameLimit;
- for (int j = 0; j < runFrameLimit; j++) {
- tupleAccessor[frameOffset + j] = null;
- }
- }
- }
-
- private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldLength(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
- int l2_start = fta2.getFieldStartOffset(j2, fIdx);
- int l2_end = fta2.getFieldEndOffset(j2, fIdx);
- int l2 = l2_end - l2_start;
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- private Comparator<ReferenceHashEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceHashEntry>() {
-
- @Override
- public int compare(ReferenceHashEntry o1, ReferenceHashEntry o2) {
- int cmp = o1.getHashValue() - o2.getHashValue();
- if (cmp != 0) {
- return cmp;
- } else {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
- int j1 = o1.getTupleIndex();
- int j2 = o2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < keyFields.length; ++f) {
- int fIdx = f;
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- }
-
- };
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
deleted file mode 100644
index a846e4a..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-
-public class HybridHashSortRunMerger {
-
- private final IHyracksTaskContext ctx;
- private final List<RunFileReader> runs;
- private final int[] keyFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDesc;
- private final int framesLimit;
- private final int tableSize;
- private final IFrameWriter writer;
- private final IAggregatorDescriptor grouper;
- private final ITuplePartitionComputer tpc;
- private ByteBuffer outFrame;
- private FrameTupleAppender outFrameAppender;
- private final boolean isLoadBuffered;
-
- public HybridHashSortRunMerger(IHyracksTaskContext ctx, LinkedList<RunFileReader> runs, int[] keyFields,
- IBinaryComparator[] comparators, RecordDescriptor recordDesc, ITuplePartitionComputer tpc,
- IAggregatorDescriptor grouper, int framesLimit, int tableSize, IFrameWriter writer, boolean isLoadBuffered) {
- this.ctx = ctx;
- this.runs = runs;
- this.keyFields = keyFields;
- this.comparators = comparators;
- this.recordDesc = recordDesc;
- this.framesLimit = framesLimit;
- this.writer = writer;
- this.isLoadBuffered = isLoadBuffered;
- this.tableSize = tableSize;
- this.tpc = tpc;
- this.grouper = grouper;
- }
-
- public void process() throws HyracksDataException {
-
- // FIXME
- int mergeLevels = 0, mergeRunCount = 0;
- try {
-
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
-
- int maxMergeWidth = framesLimit - 1;
- while (runs.size() > maxMergeWidth) {
- int generationSeparator = 0;
- // FIXME
- int mergeRounds = 0;
- while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
- int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth), runs.size()
- - maxMergeWidth + 1);
- FileReference newRun = null;
- IFrameWriter mergeResultWriter = this.writer;
- newRun = ctx.createManagedWorkspaceFile(HybridHashSortRunMerger.class.getSimpleName());
- mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
- mergeResultWriter.open();
- IFrameReader[] runCursors = new RunFileReader[mergeWidth];
- for (int i = 0; i < mergeWidth; i++) {
- runCursors[i] = runs.get(generationSeparator + i);
- }
- merge(mergeResultWriter, runCursors, false);
- runs.subList(generationSeparator, generationSeparator + mergeWidth).clear();
- runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
- mergeRounds++;
- }
- mergeLevels++;
- mergeRunCount += mergeRounds;
- }
- if (!runs.isEmpty()) {
- IFrameReader[] runCursors = new RunFileReader[runs.size()];
- for (int i = 0; i < runCursors.length; i++) {
- runCursors[i] = runs.get(i);
- }
- merge(writer, runCursors, true);
- }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
-
- ctx.getCounterContext()
- .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.runs.count", true)
- .set(mergeRunCount);
-
- ctx.getCounterContext()
- .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.levels", true)
- .set(mergeLevels);
- }
- }
-
- private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors, boolean isFinal)
- throws HyracksDataException {
- // FIXME
- long methodTimer = System.nanoTime();
-
- IFrameReader merger = new GroupRunMergingFrameReader(ctx, runCursors, framesLimit, tableSize, keyFields, tpc,
- comparators, grouper, recordDesc, isFinal, isLoadBuffered);
- merger.open();
- try {
- while (merger.nextFrame(outFrame)) {
- FrameUtils.flushFrame(outFrame, mergeResultWriter);
- }
- } finally {
- merger.close();
- }
- ctx.getCounterContext()
- .getCounter("optional." + HybridHashSortRunMerger.class.getSimpleName() + ".merge.time", true)
- .update(System.nanoTime() - methodTimer);
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
deleted file mode 100644
index 3c91fea..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceEntryWithBucketID.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferenceEntryWithBucketID extends ReferenceEntry {
-
- private int bucketID;
-
- public ReferenceEntryWithBucketID(int runid, FrameTupleAccessor fta, int tupleIndex, int bucketID) {
- super(runid, fta, tupleIndex);
- this.bucketID = bucketID;
- }
-
- public int getBucketID() {
- return bucketID;
- }
-
- public void setBucketID(int bucketID) {
- this.bucketID = bucketID;
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
deleted file mode 100644
index 394f0a8..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferenceHashEntry.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferenceHashEntry extends ReferenceEntry {
-
- private int hashValue;
-
- public ReferenceHashEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int hashVal) {
- super(runid, fta, tupleIndex);
- this.hashValue = hashVal;
- }
-
- public int getHashValue() {
- return hashValue;
- }
-
- public void setHashValue(int hashVal) {
- this.hashValue = hashVal;
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
deleted file mode 100644
index adfbe81..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedBucketBasedPriorityQueue.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Comparator;
-
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-
-public class ReferencedBucketBasedPriorityQueue {
-
- private final int frameSize;
- private final RecordDescriptor recordDescriptor;
- private final ReferenceHashEntry entries[];
- private final int size;
- private final BitSet runAvail;
- private int nItems;
- private final int tableSize;
-
- private final Comparator<ReferenceHashEntry> comparator;
-
- private final ITuplePartitionComputer tpc;
-
- public ReferencedBucketBasedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
- Comparator<ReferenceHashEntry> comparator, ITuplePartitionComputer tpc, int tableSize) {
- this.frameSize = frameSize;
- this.recordDescriptor = recordDescriptor;
- if (initSize < 1)
- throw new IllegalArgumentException();
- this.comparator = comparator;
- nItems = initSize;
- size = (initSize + 1) & 0xfffffffe;
- entries = new ReferenceHashEntry[size];
- runAvail = new BitSet(size);
- runAvail.set(0, initSize, true);
- for (int i = 0; i < size; i++) {
- entries[i] = new ReferenceHashEntry(i, null, -1, -1);
- }
- this.tpc = tpc;
- this.tableSize = tableSize;
- }
-
- /**
- * Retrieve the top entry without removing it
- *
- * @return the top entry
- */
- public ReferenceEntry peek() {
- return entries[0];
- }
-
- /**
- * compare the new entry with entries within the queue, to find a spot for
- * this new entry
- *
- * @param entry
- * @return runid of this entry
- * @throws HyracksDataException
- * @throws IOException
- */
- public int popAndReplace(FrameTupleAccessor fta, int tIndex) throws HyracksDataException {
- ReferenceHashEntry entry = entries[0];
- if (entry.getAccessor() == null) {
- entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
- }
- entry.getAccessor().reset(fta.getBuffer());
- entry.setTupleIndex(tIndex);
- entry.setHashValue(tpc.partition(fta, tIndex, tableSize));
-
- add(entry);
- return entry.getRunid();
- }
-
- /**
- * Push entry into priority queue
- *
- * @param e
- * the new Entry
- * @throws HyracksDataException
- */
- private void add(ReferenceHashEntry e) throws HyracksDataException {
- ReferenceHashEntry min = entries[0];
- int slot = (size >> 1) + (min.getRunid() >> 1);
-
- ReferenceHashEntry curr = e;
- while (!runAvail.isEmpty() && slot > 0) {
- int c = 0;
- if (!runAvail.get(entries[slot].getRunid())) {
- // run of entries[slot] is exhausted, i.e. not available, curr
- // wins
- c = 1;
- } else if (entries[slot].getAccessor() != null /*
- * entries[slot] is
- * not MIN value
- */
- && runAvail.get(curr.getRunid() /* curr run is available */)) {
-
- if (curr.getAccessor() != null) {
- c = comparator.compare(entries[slot], curr);
- } else {
- // curr is MIN value, wins
- c = 1;
- }
- }
-
- if (c <= 0) { // curr lost
- // entries[slot] swaps up
- ReferenceHashEntry tmp = entries[slot];
- entries[slot] = curr;
- curr = tmp;// winner to pass up
- }// else curr wins
- slot >>= 1;
- }
- // set new entries[0]
- entries[0] = curr;
- }
-
- /**
- * Pop is called only when a run is exhausted
- *
- * @return
- * @throws HyracksDataException
- */
- public ReferenceHashEntry pop() throws HyracksDataException {
- ReferenceHashEntry min = entries[0];
- runAvail.clear(min.getRunid());
- add(min);
- nItems--;
- return min;
- }
-
- public boolean areRunsExhausted() {
- return runAvail.isEmpty();
- }
-
- public int size() {
- return nItems;
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
deleted file mode 100644
index d9d5118..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/ReferencedPriorityQueue.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package edu.uci.ics.hyracks.dataflow.std.group.hashsort;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Comparator;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * TODO need to be merged with the ReferencedPriorityQueue in the util package
- */
-public class ReferencedPriorityQueue {
- private final int frameSize;
- private final RecordDescriptor recordDescriptor;
- private final ReferenceEntryWithBucketID entries[];
- private final int size;
- private final BitSet runAvail;
- private int nItems;
-
- private final Comparator<ReferenceEntryWithBucketID> comparator;
-
- public ReferencedPriorityQueue(int frameSize, RecordDescriptor recordDescriptor, int initSize,
- Comparator<ReferenceEntryWithBucketID> comparator) {
- this.frameSize = frameSize;
- this.recordDescriptor = recordDescriptor;
- if (initSize < 1)
- throw new IllegalArgumentException();
- this.comparator = comparator;
- nItems = initSize;
- size = (initSize + 1) & 0xfffffffe;
- entries = new ReferenceEntryWithBucketID[size];
- runAvail = new BitSet(size);
- runAvail.set(0, initSize, true);
- for (int i = 0; i < size; i++) {
- entries[i] = new ReferenceEntryWithBucketID(i, null, -1, -1);
- }
- }
-
- /**
- * Retrieve the top entry without removing it
- *
- * @return the top entry
- */
- public ReferenceEntryWithBucketID peek() {
- return entries[0];
- }
-
- /**
- * compare the new entry with entries within the queue, to find a spot for
- * this new entry
- *
- * @param entry
- * @return runid of this entry
- * @throws IOException
- */
- public int popAndReplace(FrameTupleAccessor fta, int tIndex, int bucketID) {
- ReferenceEntryWithBucketID entry = entries[0];
- if (entry.getAccessor() == null) {
- entry.setAccessor(new FrameTupleAccessor(frameSize, recordDescriptor));
- }
- entry.getAccessor().reset(fta.getBuffer());
- entry.setTupleIndex(tIndex);
- entry.setBucketID(bucketID);
-
- add(entry);
- return entry.getRunid();
- }
-
- /**
- * Push entry into priority queue
- *
- * @param e
- * the new Entry
- */
- private void add(ReferenceEntryWithBucketID e) {
- ReferenceEntryWithBucketID min = entries[0];
- int slot = (size >> 1) + (min.getRunid() >> 1);
-
- ReferenceEntryWithBucketID curr = e;
- while (!runAvail.isEmpty() && slot > 0) {
- int c = 0;
- if (!runAvail.get(entries[slot].getRunid())) {
- // run of entries[slot] is exhausted, i.e. not available, curr
- // wins
- c = 1;
- } else if (entries[slot].getAccessor() != null /*
- * entries[slot] is
- * not MIN value
- */
- && runAvail.get(curr.getRunid() /* curr run is available */)) {
-
- if (curr.getAccessor() != null) {
- c = comparator.compare(entries[slot], curr);
- } else {
- // curr is MIN value, wins
- c = 1;
- }
- }
-
- if (c <= 0) { // curr lost
- // entries[slot] swaps up
- ReferenceEntryWithBucketID tmp = entries[slot];
- entries[slot] = curr;
- curr = tmp;// winner to pass up
- }// else curr wins
- slot >>= 1;
- }
- // set new entries[0]
- entries[0] = curr;
- }
-
- /**
- * Pop is called only when a run is exhausted
- *
- * @return
- */
- public ReferenceEntryWithBucketID pop() {
- ReferenceEntryWithBucketID min = entries[0];
- runAvail.clear(min.getRunid());
- add(min);
- nItems--;
- return min;
- }
-
- public boolean areRunsExhausted() {
- return runAvail.isEmpty();
- }
-
- public int size() {
- return nItems;
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
deleted file mode 100644
index 72bae76..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAccessorForGroupHashtable.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class FrameTupleAccessorForGroupHashtable implements IFrameTupleAccessor {
- private final int frameSize;
- private final RecordDescriptor recordDescriptor;
-
- private final static int INT_SIZE = 4;
-
- private ByteBuffer buffer;
-
- public FrameTupleAccessorForGroupHashtable(int frameSize, RecordDescriptor recordDescriptor) {
- this.frameSize = frameSize;
- this.recordDescriptor = recordDescriptor;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldCount()
- */
- @Override
- public int getFieldCount() {
- return recordDescriptor.getFieldCount();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldSlotsLength()
- */
- @Override
- public int getFieldSlotsLength() {
- return getFieldCount() * 4;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldEndOffset(int, int)
- */
- @Override
- public int getFieldEndOffset(int tupleIndex, int fIdx) {
- return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldStartOffset(int, int)
- */
- @Override
- public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getFieldLength(int, int)
- */
- @Override
- public int getFieldLength(int tupleIndex, int fIdx) {
- return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleEndOffset(int)
- */
- @Override
- public int getTupleEndOffset(int tupleIndex) {
- return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1)) - 2 * INT_SIZE;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleStartOffset(int)
- */
- @Override
- public int getTupleStartOffset(int tupleIndex) {
- return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getTupleCount()
- */
- @Override
- public int getTupleCount() {
- return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#getBuffer()
- */
- @Override
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor#reset(java.nio.ByteBuffer)
- */
- @Override
- public void reset(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public int getTupleHashReferenceOffset(int tupleIndex) {
- return getTupleEndOffset(tupleIndex);
- }
-
- public int getTupleEndOffsetWithHashReference(int tupleIndex) {
- return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
- }
-
- public int getHashReferenceNextFrameIndex(int tupleIndex) {
- return buffer.getInt(getTupleHashReferenceOffset(tupleIndex));
- }
-
- public int getHashReferenceNextTupleIndex(int tupleIndex) {
- return buffer.getInt(getTupleHashReferenceOffset(tupleIndex) + INT_SIZE);
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
deleted file mode 100644
index c5668f5..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/FrameTupleAppenderForGroupHashtable.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.FrameHelper;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-
-public class FrameTupleAppenderForGroupHashtable {
- private final int frameSize;
-
- private ByteBuffer buffer;
-
- private int tupleCount;
-
- private int tupleDataEndOffset;
-
- public FrameTupleAppenderForGroupHashtable(int frameSize) {
- this.frameSize = frameSize;
- }
-
- public void reset(ByteBuffer buffer, boolean clear) {
- this.buffer = buffer;
- if (clear) {
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
- tupleCount = 0;
- tupleDataEndOffset = 0;
- } else {
- tupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
- tupleDataEndOffset = tupleCount == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)
- - tupleCount * 4);
- }
- }
-
- public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) {
- if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
- for (int i = 0; i < fieldSlots.length; ++i) {
- buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
- }
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + fieldSlots.length * 4, length);
- buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length, -1);
- buffer.putInt(tupleDataEndOffset + fieldSlots.length * 4 + length + 4, -1);
- tupleDataEndOffset += fieldSlots.length * 4 + length + 2 * 4;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
- ++tupleCount;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
- return true;
- }
- return false;
- }
-
- public boolean append(byte[] bytes, int offset, int length) {
- if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
- buffer.putInt(tupleDataEndOffset + length, -1);
- buffer.putInt(tupleDataEndOffset + length + 4, -1);
- tupleDataEndOffset += length + 2 * 4;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
- ++tupleCount;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
- return true;
- }
- return false;
- }
-
- public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length) {
- if (tupleDataEndOffset + fieldSlots.length * 4 + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
- int effectiveSlots = 0;
- for (int i = 0; i < fieldSlots.length; ++i) {
- if (fieldSlots[i] > 0) {
- buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots[i]);
- effectiveSlots++;
- }
- }
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + effectiveSlots * 4, length);
- buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length, -1);
- buffer.putInt(tupleDataEndOffset + effectiveSlots * 4 + length + 4, -1);
- tupleDataEndOffset += effectiveSlots * 4 + length + 2 * 4;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
- ++tupleCount;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
- return true;
- }
- return false;
- }
-
- public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
- int length = tEndOffset - tStartOffset;
- if (tupleDataEndOffset + length + 2 * 4 + 4 + (tupleCount + 1) * 4 <= frameSize) {
- ByteBuffer src = tupleAccessor.getBuffer();
- System.arraycopy(src.array(), tStartOffset, buffer.array(), tupleDataEndOffset, length);
- buffer.putInt(tupleDataEndOffset + length, -1);
- buffer.putInt(tupleDataEndOffset + length + 4, -1);
- tupleDataEndOffset += length + 2 * 4;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
- ++tupleCount;
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
- return true;
- }
- return false;
- }
-
- public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) {
- int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
- int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
- return append(tupleAccessor, tStartOffset, tEndOffset);
- }
-
- public int getTupleCount() {
- return tupleCount;
- }
-
- public ByteBuffer getBuffer() {
- return buffer;
- }
-}
-
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
deleted file mode 100644
index b325b83..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
-
-public class HybridHashGroupHashTable implements IFrameWriter {
-
- private final static int HEADER_REF_EMPTY = -1;
-
- private static final int INT_SIZE = 4;
-
- private IHyracksTaskContext ctx;
-
- private final int frameSize;
-
- private final int framesLimit;
-
- private final int tableSize;
-
- private final int numOfPartitions;
-
- private final IFrameWriter outputWriter;
-
- private final IBinaryComparator[] comparators;
-
- /**
- * index for keys
- */
- private final int[] inputKeys, internalKeys;
-
- private final RecordDescriptor inputRecordDescriptor, outputRecordDescriptor;
-
- /**
- * hash partitioner for hashing
- */
- private final ITuplePartitionComputer hashComputer;
-
- /**
- * hash partitioner for partitioning
- */
- private final ITuplePartitionComputer partitionComputer;
-
- /**
- * Hashtable haders
- */
- private ByteBuffer[] headers;
-
- /**
- * buffers for hash table
- */
- private ByteBuffer[] contents;
-
- /**
- * output buffers for spilled partitions
- */
- private ByteBuffer[] spilledPartOutputBuffers;
-
- /**
- * run writers for spilled partitions
- */
- private RunFileWriter[] spilledPartRunWriters;
-
- private int[] spilledPartRunSizeArrayInFrames;
- private int[] spilledPartRunSizeArrayInTuples;
-
- private List<IFrameReader> spilledPartRunReaders = null;
- private List<Integer> spilledRunAggregatedPages = null;
- private List<Integer> spilledPartRunSizesInFrames = null;
- private List<Integer> spilledPartRunSizesInTuples = null;
-
- /**
- * index of the current working buffer in hash table
- */
- private int currentHashTableFrame;
-
- /**
- * Aggregation state
- */
- private AggregateState htAggregateState;
-
- /**
- * the aggregator
- */
- private final IAggregatorDescriptor aggregator;
-
- /**
- * records inserted into the in-memory hash table (for hashing and aggregation)
- */
- private int hashedRawRecords = 0;
-
- /**
- * in-memory part size in tuples
- */
- private int hashedKeys = 0;
-
- /**
- * Hash table tuple pointer
- */
- private TuplePointer matchPointer;
-
- /**
- * Frame tuple accessor for input data frames
- */
- private FrameTupleAccessor inputFrameTupleAccessor;
-
- /**
- * flag for whether the hash table if full
- */
- private boolean isHashtableFull;
-
- /**
- * flag for only partition (no aggregation and hashing)
- */
- private boolean isPartitionOnly;
-
- /**
- * Tuple accessor for hash table contents
- */
- private FrameTupleAccessorForGroupHashtable hashtableRecordAccessor;
-
- private ArrayTupleBuilder internalTupleBuilder;
-
- private FrameTupleAppender spilledPartInsertAppender;
-
- private FrameTupleAppenderForGroupHashtable htInsertAppender;
-
- public HybridHashGroupHashTable(IHyracksTaskContext ctx, int framesLimit, int tableSize, int numOfPartitions,
- int[] keys, int hashSeedOffset, IBinaryComparator[] comparators, ITuplePartitionComputerFamily tpcFamily,
- IAggregatorDescriptor aggregator, RecordDescriptor inputRecordDescriptor,
- RecordDescriptor outputRecordDescriptor, IFrameWriter outputWriter) throws HyracksDataException {
- this.ctx = ctx;
- this.frameSize = ctx.getFrameSize();
- this.tableSize = tableSize;
- this.framesLimit = framesLimit;
- this.numOfPartitions = numOfPartitions;
- this.inputKeys = keys;
- this.internalKeys = new int[keys.length];
- for (int i = 0; i < internalKeys.length; i++) {
- internalKeys[i] = i;
- }
-
- this.comparators = comparators;
-
- this.inputRecordDescriptor = inputRecordDescriptor;
- this.outputRecordDescriptor = outputRecordDescriptor;
-
- this.outputWriter = outputWriter;
-
- this.hashComputer = tpcFamily.createPartitioner(hashSeedOffset * 2);
- this.partitionComputer = tpcFamily.createPartitioner(hashSeedOffset * 2 + 1);
-
- this.aggregator = aggregator;
-
- }
-
- public static double getHashtableOverheadRatio(int tableSize, int frameSize, int framesLimit, int recordSizeInByte) {
- int pagesForRecord = framesLimit - getHeaderPages(tableSize, frameSize);
- int recordsInHashtable = (pagesForRecord - 1) * ((int) (frameSize / (recordSizeInByte + 2 * INT_SIZE)));
-
- return (double) framesLimit * frameSize / recordsInHashtable / recordSizeInByte;
- }
-
- public static int getHeaderPages(int tableSize, int frameSize) {
- return (int) Math.ceil((double)tableSize * INT_SIZE * 2 / frameSize);
- }
-
- @Override
- public void open() throws HyracksDataException {
- // initialize hash headers
- int htHeaderCount = getHeaderPages(tableSize, frameSize);
-
- isPartitionOnly = false;
- if (numOfPartitions >= framesLimit - htHeaderCount) {
- isPartitionOnly = true;
- }
-
- if (isPartitionOnly) {
- htHeaderCount = 0;
- }
-
- headers = new ByteBuffer[htHeaderCount];
-
- // initialize hash table contents
- contents = new ByteBuffer[framesLimit - htHeaderCount - numOfPartitions];
- currentHashTableFrame = 0;
- isHashtableFull = false;
-
- // initialize hash table aggregate state
- htAggregateState = aggregator.createAggregateStates();
-
- // initialize partition information
- spilledPartOutputBuffers = new ByteBuffer[numOfPartitions];
- spilledPartRunWriters = new RunFileWriter[numOfPartitions];
- spilledPartRunSizeArrayInFrames = new int[numOfPartitions];
- spilledPartRunSizeArrayInTuples = new int[numOfPartitions];
-
- // initialize other helper classes
- inputFrameTupleAccessor = new FrameTupleAccessor(frameSize, inputRecordDescriptor);
- internalTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
- spilledPartInsertAppender = new FrameTupleAppender(frameSize);
-
- htInsertAppender = new FrameTupleAppenderForGroupHashtable(frameSize);
- matchPointer = new TuplePointer();
- hashtableRecordAccessor = new FrameTupleAccessorForGroupHashtable(frameSize, outputRecordDescriptor);
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputFrameTupleAccessor.reset(buffer);
- int tupleCount = inputFrameTupleAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- insert(inputFrameTupleAccessor, i);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- for (int i = 0; i < numOfPartitions; i++) {
- if (spilledPartRunWriters[i] != null) {
- spilledPartRunWriters[i].close();
- }
- }
- htAggregateState.close();
- }
-
- private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
-
- if (isPartitionOnly) {
- // for partition only
- int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
- insertSpilledPartition(accessor, tupleIndex, pid);
- spilledPartRunSizeArrayInTuples[pid]++;
- return;
- }
-
- int hid = hashComputer.partition(accessor, tupleIndex, tableSize);
-
- if (findMatch(hid, accessor, tupleIndex)) {
- // found a matching: do aggregation
- hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
- aggregator.aggregate(accessor, tupleIndex, hashtableRecordAccessor, matchPointer.tupleIndex,
- htAggregateState);
- hashedRawRecords++;
- } else {
- if (isHashtableFull) {
- // when hash table is full: spill the record
- int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
- insertSpilledPartition(accessor, tupleIndex, pid);
- spilledPartRunSizeArrayInTuples[pid]++;
- } else {
- // insert a new entry into the hash table
- internalTupleBuilder.reset();
- for (int k = 0; k < inputKeys.length; k++) {
- internalTupleBuilder.addField(accessor, tupleIndex, inputKeys[k]);
- }
-
- aggregator.init(internalTupleBuilder, accessor, tupleIndex, htAggregateState);
-
- if (contents[currentHashTableFrame] == null) {
- contents[currentHashTableFrame] = ctx.allocateFrame();
- }
-
- htInsertAppender.reset(contents[currentHashTableFrame], false);
- if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- // hash table is full: try to allocate more frame
- currentHashTableFrame++;
- if (currentHashTableFrame >= contents.length) {
- // no more frame to allocate: stop expending the hash table
- isHashtableFull = true;
-
- // reinsert the record
- insert(accessor, tupleIndex);
-
- return;
- } else {
- if (contents[currentHashTableFrame] == null) {
- contents[currentHashTableFrame] = ctx.allocateFrame();
- }
-
- htInsertAppender.reset(contents[currentHashTableFrame], true);
-
- if (!htInsertAppender.append(internalTupleBuilder.getFieldEndOffsets(),
- internalTupleBuilder.getByteArray(), 0, internalTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Failed to insert an aggregation partial result into the in-memory hash table: it has the length of "
- + internalTupleBuilder.getSize() + " and fields "
- + internalTupleBuilder.getFieldEndOffsets().length);
- }
-
- }
- }
-
- // update hash table reference
- if (matchPointer.frameIndex < 0) {
- // need to initialize the hash table header
- int headerFrameIndex = getHeaderFrameIndex(hid);
- int headerFrameOffset = getHeaderTupleIndex(hid);
-
- if (headers[headerFrameIndex] == null) {
- headers[headerFrameIndex] = ctx.allocateFrame();
- resetHeader(headerFrameIndex);
- }
-
- headers[headerFrameIndex].putInt(headerFrameOffset, currentHashTableFrame);
- headers[headerFrameIndex]
- .putInt(headerFrameOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
- } else {
- // update the previous reference
- hashtableRecordAccessor.reset(contents[matchPointer.frameIndex]);
- int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(matchPointer.tupleIndex);
- contents[matchPointer.frameIndex].putInt(refOffset, currentHashTableFrame);
- contents[matchPointer.frameIndex]
- .putInt(refOffset + INT_SIZE, htInsertAppender.getTupleCount() - 1);
- }
-
- hashedKeys++;
- hashedRawRecords++;
- }
- }
- }
-
- /**
- * Insert record into a spilled partition, by directly copying the tuple into the output buffer.
- *
- * @param accessor
- * @param tupleIndex
- * @param pid
- */
- private void insertSpilledPartition(FrameTupleAccessor accessor, int tupleIndex, int pid)
- throws HyracksDataException {
-
- if (spilledPartOutputBuffers[pid] == null) {
- spilledPartOutputBuffers[pid] = ctx.allocateFrame();
- }
-
- spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], false);
-
- if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
- // the output buffer is full: flush
- flushSpilledPartitionOutputBuffer(pid);
- // reset the output buffer
- spilledPartInsertAppender.reset(spilledPartOutputBuffers[pid], true);
-
- if (!spilledPartInsertAppender.append(accessor, tupleIndex)) {
- throw new HyracksDataException("Failed to insert a record into a spilled partition!");
- }
- }
-
- }
-
- /**
- * Flush a spilled partition's output buffer.
- *
- * @param pid
- * @throws HyracksDataException
- */
- private void flushSpilledPartitionOutputBuffer(int pid) throws HyracksDataException {
- if (spilledPartRunWriters[pid] == null) {
- spilledPartRunWriters[pid] = new RunFileWriter(
- ctx.createManagedWorkspaceFile("HashHashPrePartitionHashTable"), ctx.getIOManager());
- spilledPartRunWriters[pid].open();
- }
-
- FrameUtils.flushFrame(spilledPartOutputBuffers[pid], spilledPartRunWriters[pid]);
-
- spilledPartRunSizeArrayInFrames[pid]++;
- }
-
- /**
- * Hash table lookup
- *
- * @param hid
- * @param accessor
- * @param tupleIndex
- * @return
- */
- private boolean findMatch(int hid, FrameTupleAccessor accessor, int tupleIndex) {
-
- matchPointer.frameIndex = -1;
- matchPointer.tupleIndex = -1;
-
- // get reference in the header
- int headerFrameIndex = getHeaderFrameIndex(hid);
- int headerFrameOffset = getHeaderTupleIndex(hid);
-
- if (headers[headerFrameIndex] == null) {
- return false;
- }
-
- // initialize the pointer to the first record
- int entryFrameIndex = headers[headerFrameIndex].getInt(headerFrameOffset);
- int entryTupleIndex = headers[headerFrameIndex].getInt(headerFrameOffset + INT_SIZE);
-
- while (entryFrameIndex >= 0) {
- matchPointer.frameIndex = entryFrameIndex;
- matchPointer.tupleIndex = entryTupleIndex;
- hashtableRecordAccessor.reset(contents[entryFrameIndex]);
- if (compare(accessor, tupleIndex, hashtableRecordAccessor, entryTupleIndex) == 0) {
- return true;
- }
- // Move to the next record in this entry following the linked list
- int refOffset = hashtableRecordAccessor.getTupleHashReferenceOffset(entryTupleIndex);
- int prevFrameIndex = entryFrameIndex;
- entryFrameIndex = contents[prevFrameIndex].getInt(refOffset);
- entryTupleIndex = contents[prevFrameIndex].getInt(refOffset + INT_SIZE);
- }
-
- return false;
- }
-
- public void finishup() throws HyracksDataException {
- // spill all output buffers
- for (int i = 0; i < numOfPartitions; i++) {
- if (spilledPartOutputBuffers[i] != null) {
- flushSpilledPartitionOutputBuffer(i);
- }
- }
- spilledPartOutputBuffers = null;
-
- // flush in-memory aggregation results: no more frame cost here as all output buffers are recycled
- ByteBuffer outputBuffer = ctx.allocateFrame();
- FrameTupleAppender outputBufferAppender = new FrameTupleAppender(frameSize);
- outputBufferAppender.reset(outputBuffer, true);
-
- ArrayTupleBuilder outFlushTupleBuilder = new ArrayTupleBuilder(outputRecordDescriptor.getFieldCount());
-
- for (ByteBuffer htFrame : contents) {
- if (htFrame == null) {
- continue;
- }
- hashtableRecordAccessor.reset(htFrame);
- int tupleCount = hashtableRecordAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- outFlushTupleBuilder.reset();
-
- for (int k = 0; k < internalKeys.length; k++) {
- outFlushTupleBuilder.addField(hashtableRecordAccessor, i, internalKeys[k]);
- }
-
- aggregator.outputFinalResult(outFlushTupleBuilder, hashtableRecordAccessor, i, htAggregateState);
-
- if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
- outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, outputWriter);
- outputBufferAppender.reset(outputBuffer, true);
-
- if (!outputBufferAppender.append(outFlushTupleBuilder.getFieldEndOffsets(),
- outFlushTupleBuilder.getByteArray(), 0, outFlushTupleBuilder.getSize())) {
- throw new HyracksDataException(
- "Failed to flush a record from in-memory hash table: record has length of "
- + outFlushTupleBuilder.getSize() + " and fields "
- + outFlushTupleBuilder.getFieldEndOffsets().length);
- }
- }
- }
- }
-
- if (outputBufferAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, outputWriter);
- }
-
- // create run readers and statistic information for spilled runs
- spilledPartRunReaders = new LinkedList<IFrameReader>();
- spilledRunAggregatedPages = new LinkedList<Integer>();
- spilledPartRunSizesInFrames = new LinkedList<Integer>();
- spilledPartRunSizesInTuples = new LinkedList<Integer>();
- for (int i = 0; i < numOfPartitions; i++) {
- if (spilledPartRunWriters[i] != null) {
- spilledPartRunReaders.add(spilledPartRunWriters[i].createReader());
- spilledRunAggregatedPages.add(0);
- spilledPartRunWriters[i].close();
- spilledPartRunSizesInFrames.add(spilledPartRunSizeArrayInFrames[i]);
- spilledPartRunSizesInTuples.add(spilledPartRunSizeArrayInTuples[i]);
- }
- }
- }
-
- /**
- * Compare an input record with a hash table entry.
- *
- * @param accessor
- * @param tupleIndex
- * @param hashAccessor
- * @param hashTupleIndex
- * @return
- */
- private int compare(FrameTupleAccessor accessor, int tupleIndex, FrameTupleAccessorForGroupHashtable hashAccessor,
- int hashTupleIndex) {
- int tStart0 = accessor.getTupleStartOffset(tupleIndex);
- int fStartOffset0 = accessor.getFieldSlotsLength() + tStart0;
-
- int tStart1 = hashAccessor.getTupleStartOffset(hashTupleIndex);
- int fStartOffset1 = hashAccessor.getFieldSlotsLength() + tStart1;
-
- for (int i = 0; i < internalKeys.length; ++i) {
- int fStart0 = accessor.getFieldStartOffset(tupleIndex, inputKeys[i]);
- int fEnd0 = accessor.getFieldEndOffset(tupleIndex, inputKeys[i]);
- int fLen0 = fEnd0 - fStart0;
-
- int fStart1 = hashAccessor.getFieldStartOffset(hashTupleIndex, internalKeys[i]);
- int fEnd1 = hashAccessor.getFieldEndOffset(hashTupleIndex, internalKeys[i]);
- int fLen1 = fEnd1 - fStart1;
-
- int c = comparators[i].compare(accessor.getBuffer().array(), fStart0 + fStartOffset0, fLen0, hashAccessor
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- /**
- * Get the header frame index of the given hash table entry
- *
- * @param entry
- * @return
- */
- private int getHeaderFrameIndex(int entry) {
- int frameIndex = (entry / frameSize * 2 * INT_SIZE) + (entry % frameSize * 2 * INT_SIZE / frameSize);
- return frameIndex;
- }
-
- /**
- * Get the tuple index of the given hash table entry
- *
- * @param entry
- * @return
- */
- private int getHeaderTupleIndex(int entry) {
- int offset = (entry % frameSize) * 2 * INT_SIZE % frameSize;
- return offset;
- }
-
- /**
- * reset the header page.
- *
- * @param headerFrameIndex
- */
- private void resetHeader(int headerFrameIndex) {
- for (int i = 0; i < frameSize; i += INT_SIZE) {
- headers[headerFrameIndex].putInt(i, HEADER_REF_EMPTY);
- }
- }
-
- public List<Integer> getSpilledRunsSizeInRawTuples() throws HyracksDataException {
- return spilledPartRunSizesInTuples;
- }
-
- public int getHashedUniqueKeys() throws HyracksDataException {
- return hashedKeys;
- }
-
- public int getHashedRawRecords() throws HyracksDataException {
- return hashedRawRecords;
- }
-
- public List<Integer> getSpilledRunsAggregatedPages() throws HyracksDataException {
- return spilledRunAggregatedPages;
- }
-
- public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
- return spilledPartRunReaders;
- }
-
- public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
- return spilledPartRunSizesInFrames;
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
deleted file mode 100644
index 118ca75..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortGroupHashTable;
-import edu.uci.ics.hyracks.dataflow.std.group.hashsort.HybridHashSortRunMerger;
-
-public class HybridHashGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private static final double HYBRID_FALLBACK_THRESHOLD = 0.8;
-
- // merge with fudge factor
- private static final double ESTIMATOR_MAGNIFIER = 1.2;
-
- // input key fields
- private final int[] keyFields;
-
- // intermediate and final key fields
- private final int[] storedKeyFields;
-
- /**
- * Input sizes as the count of the raw records.
- */
- private final long inputSizeInRawRecords;
-
- /**
- * Input size as the count of the unique keys.
- */
- private final long inputSizeInUniqueKeys;
-
- // hash table size
- private final int tableSize;
-
- // estimated record size: used for compute the fudge factor
- private final int userProvidedRecordSizeInBytes;
-
- // aggregator
- private final IAggregatorDescriptorFactory aggregatorFactory;
-
- // merger, in case of falling back to the hash-sort algorithm for hash skewness
- private final IAggregatorDescriptorFactory mergerFactory;
-
- // for the sort fall-back algorithm
- private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
- // total memory in pages
- private final int framesLimit;
-
- // comparator factories for key fields.
- private final IBinaryComparatorFactory[] comparatorFactories;
-
- /**
- * hash families for each field: a hash function family is need as we may have
- * more than one levels of hashing
- */
- private final IBinaryHashFunctionFamily[] hashFamilies;
-
- /**
- * Flag for input adjustment
- */
- private final boolean doInputAdjustment;
-
- private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
-
- public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
- IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
- int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor outRecDesc) throws HyracksDataException {
- this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
- mergerFactory, outRecDesc, true);
- }
-
- public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
- long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
- IBinaryComparatorFactory[] comparatorFactories, IBinaryHashFunctionFamily[] hashFamilies,
- int hashFuncStartLevel, INormalizedKeyComputerFactory firstNormalizerFactory,
- IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
- RecordDescriptor outRecDesc, boolean doInputAdjustment) throws HyracksDataException {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- this.tableSize = tableSize;
- this.userProvidedRecordSizeInBytes = recordSizeInBytes;
-
- this.inputSizeInRawRecords = inputSizeInRawRecords;
- this.inputSizeInUniqueKeys = inputSizeInUniqueKeys;
-
- if (framesLimit <= 3) {
- // at least 3 frames: 2 for in-memory hash table, and 1 for output buffer
- throw new HyracksDataException(
- "Not enough memory for Hash-Hash Aggregation algorithm: at least 3 frames are necessary, but only "
- + framesLimit + " available.");
- }
-
- this.keyFields = keyFields;
- storedKeyFields = new int[keyFields.length];
- for (int i = 0; i < storedKeyFields.length; i++) {
- storedKeyFields[i] = i;
- }
-
- this.aggregatorFactory = aggregatorFactory;
-
- this.mergerFactory = mergerFactory;
- this.firstNormalizerFactory = firstNormalizerFactory;
-
- this.comparatorFactories = comparatorFactories;
-
- this.hashFamilies = hashFamilies;
-
- recordDescriptors[0] = outRecDesc;
-
- this.doInputAdjustment = doInputAdjustment;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparators.length; i++) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- final RecordDescriptor inRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-
- final int frameSize = ctx.getFrameSize();
-
- final double fudgeFactor = HybridHashGroupHashTable.getHashtableOverheadRatio(tableSize, frameSize,
- framesLimit, userProvidedRecordSizeInBytes) * FUDGE_FACTOR_ESTIMATION;
-
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-
- HybridHashGroupHashTable topProcessor;
-
- int observedInputSizeInRawTuples;
- int observedInputSizeInFrames, maxRecursiveLevels;
-
- int userProvidedInputSizeInFrames;
-
- boolean topLevelFallbackCheck = true;
-
- ITuplePartitionComputerFamily tpcf = new FieldHashPartitionComputerFamily(keyFields, hashFamilies);
-
- ITuplePartitionComputerFamily tpcfMerge = new FieldHashPartitionComputerFamily(storedKeyFields,
- hashFamilies);
-
- ByteBuffer readAheadBuf;
-
- /**
- * Compute the partition numbers using hybrid-hash formula.
- *
- * @param tableSize
- * @param framesLimit
- * @param inputKeySize
- * @param partitionInOperator
- * @param factor
- * @return
- */
- private int getNumberOfPartitions(int tableSize, int framesLimit, long inputKeySize, double factor) {
-
- int hashtableHeaderPages = HybridHashGroupHashTable.getHeaderPages(tableSize, frameSize);
-
- int numberOfPartitions = HybridHashUtil.hybridHashPartitionComputer((int) Math.ceil(inputKeySize),
- framesLimit, factor);
-
- // if the partition number is more than the available hash table contents, do pure partition.
- if (numberOfPartitions >= framesLimit - hashtableHeaderPages) {
- numberOfPartitions = framesLimit;
- }
-
- if (numberOfPartitions <= 0) {
- numberOfPartitions = 1;
- }
-
- return numberOfPartitions;
- }
-
- @Override
- public void open() throws HyracksDataException {
-
- observedInputSizeInFrames = 0;
-
- // estimate the number of unique keys for this partition, given the total raw record count and unique record count
- long estimatedNumberOfUniqueKeys = HybridHashUtil.getEstimatedPartitionSizeOfUniqueKeys(
- inputSizeInRawRecords, inputSizeInUniqueKeys, 1);
-
- userProvidedInputSizeInFrames = (int) Math.ceil(estimatedNumberOfUniqueKeys
- * userProvidedRecordSizeInBytes / frameSize);
-
- int topPartitions = getNumberOfPartitions(tableSize, framesLimit,
- (int) Math.ceil(userProvidedInputSizeInFrames * ESTIMATOR_MAGNIFIER), fudgeFactor);
-
- topProcessor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize, topPartitions, keyFields, 0,
- comparators, tpcf, aggregatorFactory.createAggregator(ctx, inRecDesc, recordDescriptors[0],
- keyFields, storedKeyFields), inRecDesc, recordDescriptors[0], writer);
-
- writer.open();
- topProcessor.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- observedInputSizeInRawTuples += buffer.getInt(buffer.capacity() - 4);
- observedInputSizeInFrames++;
- topProcessor.nextFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- // estimate the maximum recursive levels
- maxRecursiveLevels = (int) Math.max(
- Math.ceil(Math.log(observedInputSizeInFrames * fudgeFactor) / Math.log(framesLimit)) + 1, 1);
-
- finishAndRecursion(topProcessor, observedInputSizeInRawTuples, inputSizeInUniqueKeys, 0,
- topLevelFallbackCheck);
-
- writer.close();
-
- }
-
- private void processRunFiles(IFrameReader runReader, int inputCardinality, int runLevel)
- throws HyracksDataException {
-
- boolean checkFallback = true;
-
- int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, (long)inputCardinality
- * userProvidedRecordSizeInBytes / frameSize, fudgeFactor);
-
- HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
- numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
- ctx, inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
- recordDescriptors[0], writer);
-
- processor.open();
-
- runReader.open();
-
- int inputRunRawSizeInTuples = 0;
-
- if (readAheadBuf == null) {
- readAheadBuf = ctx.allocateFrame();
- }
- while (runReader.nextFrame(readAheadBuf)) {
- inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
- processor.nextFrame(readAheadBuf);
- }
-
- runReader.close();
-
- finishAndRecursion(processor, inputRunRawSizeInTuples, inputCardinality, runLevel, checkFallback);
- }
-
- /**
- * Finish the hash table processing and start recursive processing on run files.
- *
- * @param ht
- * @param inputRawRecordCount
- * @param inputCardinality
- * @param level
- * @param checkFallback
- * @throws HyracksDataException
- */
- private void finishAndRecursion(HybridHashGroupHashTable ht, long inputRawRecordCount,
- long inputCardinality, int level, boolean checkFallback) throws HyracksDataException {
-
- ht.finishup();
-
- List<IFrameReader> generatedRunReaders = ht.getSpilledRuns();
- List<Integer> partitionRawRecords = ht.getSpilledRunsSizeInRawTuples();
-
- int directFlushKeysInTuples = ht.getHashedUniqueKeys();
- int directFlushRawRecordsInTuples = ht.getHashedRawRecords();
-
- ht.close();
- ht = null;
-
- ctx.getCounterContext().getCounter("optional.levels." + level + ".estiInputKeyCardinality", true)
- .update(inputCardinality);
-
- // do adjustment
- if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
- inputCardinality = (int) Math.ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
- * inputRawRecordCount);
- }
-
- ctx.getCounterContext()
- .getCounter("optional.levels." + level + ".estiInputKeyCardinalityAdjusted", true)
- .update(inputCardinality);
-
- IFrameReader recurRunReader;
- int subPartitionRawRecords;
-
- while (!generatedRunReaders.isEmpty()) {
- recurRunReader = generatedRunReaders.remove(0);
- subPartitionRawRecords = partitionRawRecords.remove(0);
-
- int runKeyCardinality = (int) Math.ceil((double) inputCardinality * subPartitionRawRecords
- / inputRawRecordCount);
-
- if ((checkFallback && runKeyCardinality > HYBRID_FALLBACK_THRESHOLD * inputCardinality)
- || level > maxRecursiveLevels) {
- Logger.getLogger(HybridHashGroupOperatorDescriptor.class.getSimpleName()).warning(
- "Hybrid-hash falls back to hash-sort algorithm! (" + level + ":" + maxRecursiveLevels
- + ")");
- fallback(recurRunReader, level);
- } else {
- processRunFiles(recurRunReader, runKeyCardinality, level + 1);
- }
-
- }
- }
-
- private void fallback(IFrameReader recurRunReader, int runLevel) throws HyracksDataException {
- // fall back
- FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
- HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
- keyFields, comparators, tpcf.createPartitioner(runLevel + 1),
- firstNormalizerFactory.createNormalizedKeyComputer(), aggregatorFactory.createAggregator(ctx,
- inRecDesc, recordDescriptors[0], keyFields, storedKeyFields), inRecDesc,
- recordDescriptors[0]);
-
- recurRunReader.open();
- if (readAheadBuf == null) {
- readAheadBuf = ctx.allocateFrame();
- }
- while (recurRunReader.nextFrame(readAheadBuf)) {
- runFrameTupleAccessor.reset(readAheadBuf);
- int tupleCount = runFrameTupleAccessor.getTupleCount();
- for (int j = 0; j < tupleCount; j++) {
- hhsTable.insert(runFrameTupleAccessor, j);
- }
- }
-
- recurRunReader.close();
- hhsTable.finishup();
-
- LinkedList<RunFileReader> hhsRuns = hhsTable.getRunFileReaders();
-
- if (hhsRuns.isEmpty()) {
- hhsTable.flushHashtableToOutput(writer);
- hhsTable.close();
- } else {
- hhsTable.close();
- HybridHashSortRunMerger hhsMerger = new HybridHashSortRunMerger(ctx, hhsRuns, storedKeyFields,
- comparators, recordDescriptors[0], tpcfMerge.createPartitioner(runLevel + 1),
- mergerFactory.createAggregator(ctx, recordDescriptors[0], recordDescriptors[0],
- storedKeyFields, storedKeyFields), framesLimit, tableSize, writer, false);
- hhsMerger.process();
- }
- }
-
- };
- }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
deleted file mode 100644
index 5323887..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashUtil.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-public class HybridHashUtil {
-
- /**
- * Compute the expected number of spilling partitions (in-memory partition is not included), using the hybrid-hash
- * algorithm from [Shapiro86]. Note that 0 means that there is no need to have spilling partitions.
- *
- * @param inputSizeInFrames
- * @param memorySizeInFrames
- * @param fudgeFactor
- * @return
- */
- public static int hybridHashPartitionComputer(int inputSizeOfUniqueKeysInFrames, int memorySizeInFrames,
- double fudgeFactor) {
- return Math.max(
- (int) Math.ceil((inputSizeOfUniqueKeysInFrames * fudgeFactor - memorySizeInFrames)
- / (memorySizeInFrames - 1)), 0);
- }
-
- /**
- * Compute the estimated number of unique keys in a partition of a dataset, using Yao's formula
- *
- * @param inputSizeInRawRecords
- * @param inputSizeInUniqueKeys
- * @param numOfPartitions
- * @return
- */
- public static long getEstimatedPartitionSizeOfUniqueKeys(long inputSizeInRawRecords, long inputSizeInUniqueKeys,
- int numOfPartitions) {
- if (numOfPartitions == 1) {
- return inputSizeInUniqueKeys;
- }
- return (long) Math.ceil(inputSizeInUniqueKeys
- * (1 - Math.pow(1 - ((double) inputSizeInRawRecords / (double) numOfPartitions)
- / (double) inputSizeInRawRecords, (double) inputSizeInRawRecords
- / (double) inputSizeInUniqueKeys)));
- }
-}