Addressing Review Comments for External Sort with Replacement Selection
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@869 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index 28c3f34..f9dda7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -8,790 +8,743 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
/**
- * @author pouria Memory Manager based on Binary Search Tree (BST) Free slot
- * size is the key for the BST nodes. Each node in BST shows a class of
- * free slots, while all the free slots with the exact same length are
- * stored as a LinkedList, whose head is a BST node. BST is not stored
- * as a separate data structure, but the actual free spaces of the
- * memory slots are used to hold BST nodes. Each BST node has the
- * logical structure, defined in the BSTNodeUtil class.
+ * @author pouria
+ *
+ * Implements Memory Manager based on creating Binary Search Tree (BST)
+ * while Free slot size is the key for the BST nodes. Each node in BST
+ * shows a class of free slots, while all the free slots within a class
+ * have same lengths. Slots in a class are stored as a LinkedList, whose
+ * head is the BST node, corresponding to that class. BST is not stored
+ * as a separate data structure, but the free slots in the memory are
+ * used to hold BST nodes. Each BST node has the logical structure,
+ * defined in the BSTNodeUtil class.
*
*/
public class BSTMemMgr implements IMemoryManager {
- private final IHyracksCommonContext ctx;
- public static int FRAME_SIZE;
+ private final IHyracksCommonContext ctx;
+ public static int frameSize;
- private ByteBuffer[] frames;
- private ByteBuffer convertBuffer;
- private Slot root;
- private Slot result; // A reusable object to hold one node returned as
- // methods result
- private Slot insertSlot; // A reusable object to hold one node in the insert
- // process
- private Slot[] par_res;
- private int lastFrame;
+ private ByteBuffer[] frames;
+ private ByteBuffer convertBuffer;
+ private Slot root;
+ private Slot result; // A reusable object to hold one node returned as
+ // method result
+ private Slot insertSlot; // A reusable object to hold one node within insert
+ // process
- private static int _debug_free_slots = 0;
- private static int _debug_tree_size = 0;
- private int _debug_total_lookup_steps;
- private int _debug_total_lookup_counts;
- private int _debug_depth_counter;
- private int _debug_max_depth;
+ private Slot[] parentRes;
+ private int lastFrame;
- public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
- this.ctx = ctx;
- FRAME_SIZE = ctx.getFrameSize();
- convertBuffer = ByteBuffer.allocate(4);
- frames = new ByteBuffer[memSize];
- lastFrame = -1;
- root = new Slot();
- insertSlot = new Slot();
- result = new Slot();
- par_res = new Slot[] { new Slot(), new Slot() };
- _debug_total_lookup_counts = 0;
- _debug_total_lookup_steps = 0;
- }
+ // Variables used for debugging/performance testing
+ private int debugFreeSlots = 0;
+ private int debugTreeSize = 0;
+ private int debugTotalLookupSteps;
+ private int debugTotalLookupCounts;
+ private int debugDepthCounter;
+ private int debugMaxDepth;
- /**
- * result is the container sent by the caller to hold the results
- */
- @Override
- public void allocate(int length, Slot result) throws HyracksDataException {
- _debug_total_lookup_counts++;
- search(length, par_res);
- if (par_res[1].isNull()) {
- addFrame(par_res);
- if (par_res[1].isNull()) {
- return;
- }
- }
+ public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+ this.ctx = ctx;
+ frameSize = ctx.getFrameSize();
+ convertBuffer = ByteBuffer.allocate(4);
+ frames = new ByteBuffer[memSize];
+ lastFrame = -1;
+ root = new Slot();
+ insertSlot = new Slot();
+ result = new Slot();
+ parentRes = new Slot[] { new Slot(), new Slot() };
+ debugTotalLookupCounts = 0;
+ debugTotalLookupSteps = 0;
+ }
- int sl = BSTNodeUtil.getLength(par_res[1], frames, convertBuffer);
- int acLen = BSTNodeUtil.getActualLength(length);
- if (shouldSplit(sl, acLen)) {
- int[] s = split(par_res[1], par_res[0], acLen);
- int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames,
- convertBuffer);
- insert(s[2], s[3], insertLen); // inserting second half of the split
- // slot
- BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
- result.set(s[0], s[1]);
- return;
- }
- allocate(par_res[1], par_res[0], length, result);
- }
+ /**
+ * result is the container sent by the caller to hold the results
+ */
+ @Override
+ public void allocate(int length, Slot result) throws HyracksDataException {
+ debugTotalLookupCounts++;
+ search(length, parentRes);
+ if (parentRes[1].isNull()) {
+ addFrame(parentRes);
+ if (parentRes[1].isNull()) {
+ return;
+ }
+ }
- @Override
- public int unallocate(Slot s) throws HyracksDataException {
- int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
- int actualLen = BSTNodeUtil.getActualLength(usedLen);
- int fix = s.getFrameIx();
- int off = s.getOffset();
+ int sl = BSTNodeUtil.getLength(parentRes[1], frames, convertBuffer);
+ int acLen = BSTNodeUtil.getActualLength(length);
+ if (shouldSplit(sl, acLen)) {
+ int[] s = split(parentRes[1], parentRes[0], acLen);
+ int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
+ insert(s[2], s[3], insertLen); // inserting second half of the split
+ // slot
+ BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
+ result.set(s[0], s[1]);
+ return;
+ }
+ allocate(parentRes[1], parentRes[0], length, result);
+ }
- int prevMemSlotFooter_off = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
- : BSTNodeUtil.INVALID_INDEX);
- int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
- int nextMemSlotHeader_off = (t < FRAME_SIZE ? t
- : BSTNodeUtil.INVALID_INDEX);
- // Remember: next and prev memory slots have the same frame index as the
- // unallocating slot
- if (!isNodeNull(fix, prevMemSlotFooter_off)
- && BSTNodeUtil.isFree(fix, prevMemSlotFooter_off, frames)) {
- int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooter_off,
- frames, convertBuffer);
- removeFromList(fix, prevMemSlotFooter_off - leftLength
- - BSTNodeUtil.HEADER_SIZE);
- int concatLength = actualLen + leftLength + 2
- * BSTNodeUtil.HEADER_SIZE;
- if (!isNodeNull(fix, nextMemSlotHeader_off)
- && BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
- removeFromList(fix, nextMemSlotHeader_off);
- concatLength += BSTNodeUtil.getLength(fix,
- nextMemSlotHeader_off, frames, convertBuffer)
- + 2
- * BSTNodeUtil.HEADER_SIZE;
- }
- insert(fix, prevMemSlotFooter_off - leftLength
- - BSTNodeUtil.HEADER_SIZE, concatLength); // newly (merged)
- // slot starts
- // at the prev
- // slot offset
- return concatLength;
+ @Override
+ public int unallocate(Slot s) throws HyracksDataException {
+ int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
+ int actualLen = BSTNodeUtil.getActualLength(usedLen);
+ int fix = s.getFrameIx();
+ int off = s.getOffset();
- } else if (!isNodeNull(fix, nextMemSlotHeader_off)
- && BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
- removeFromList(fix, nextMemSlotHeader_off);
- int concatLength = actualLen
- + BSTNodeUtil.getLength(fix, nextMemSlotHeader_off, frames,
- convertBuffer) + 2 * BSTNodeUtil.HEADER_SIZE;
- insert(fix, off, concatLength); // newly (merged) slot starts at the
- // unallocating slot offset
- return concatLength;
- }
- // unallocating slot is not merging with any neighbor
- insert(fix, off, actualLen);
- return actualLen;
- }
+ int prevMemSlotFooterOffset = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
+ : BSTNodeUtil.INVALID_INDEX);
+ int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
+ int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
+ // Remember: next and prev memory slots have the same frame index as the
+ // unallocated slot
+ if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
+ int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
+ removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
+ int concatLength = actualLen + leftLength + 2 * BSTNodeUtil.HEADER_SIZE;
+ if (!isNodeNull(fix, nextMemSlotHeaderOffset) && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
+ removeFromList(fix, nextMemSlotHeaderOffset);
+ concatLength += BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer) + 2
+ * BSTNodeUtil.HEADER_SIZE;
+ }
+ insert(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE, concatLength); // newly
+ // (merged)
+ // slot
+ // starts
+ // at
+ // the
+ // prev
+ // slot
+ // offset
+ return concatLength;
- @Override
- public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
- int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
- int length = BSTNodeUtil.getLength(frameIx, offset, frames,
- convertBuffer);
- return dest.append(frames[frameIx].array(), offToRead, length);
- }
+ } else if (!isNodeNull(fix, nextMemSlotHeaderOffset)
+ && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
+ removeFromList(fix, nextMemSlotHeaderOffset);
+ int concatLength = actualLen + BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer)
+ + 2 * BSTNodeUtil.HEADER_SIZE;
+ insert(fix, off, concatLength); // newly (merged) slot starts at the
+ // unallocated slot offset
+ return concatLength;
+ }
+ // unallocated slot is not merging with any neighbor
+ insert(fix, off, actualLen);
+ return actualLen;
+ }
- @Override
- public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
- int tIndex) {
- int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
- int tStartOffset = src.getTupleStartOffset(tIndex);
- int tEndOffset = src.getTupleEndOffset(tIndex);
- int tupleLength = tEndOffset - tStartOffset;
- ByteBuffer srcBuffer = src.getBuffer();
- System.arraycopy(srcBuffer.array(), tStartOffset,
- frames[frameIx].array(), offToCopy, tupleLength);
- return true;
- }
+ @Override
+ public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
+ int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
+ int length = BSTNodeUtil.getLength(frameIx, offset, frames, convertBuffer);
+ return dest.append(frames[frameIx].array(), offToRead, length);
+ }
- @Override
- public ByteBuffer getFrame(int frameIndex) {
- return frames[frameIndex];
- }
+ @Override
+ public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex) {
+ int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
+ int tStartOffset = src.getTupleStartOffset(tIndex);
+ int tEndOffset = src.getTupleEndOffset(tIndex);
+ int tupleLength = tEndOffset - tStartOffset;
+ ByteBuffer srcBuffer = src.getBuffer();
+ System.arraycopy(srcBuffer.array(), tStartOffset, frames[frameIx].array(), offToCopy, tupleLength);
+ return true;
+ }
- public String _debug_getAvgSearchPath() {
- double avg = (((double) _debug_total_lookup_steps) / ((double) _debug_total_lookup_counts));
- return "\nTotal allocation requests:\t" + _debug_total_lookup_counts
- + "\nAvg Allocation Path Length:\t" + avg
- + "\nMax BST Depth:\t" + _debug_max_depth;
- }
+ @Override
+ public ByteBuffer getFrame(int frameIndex) {
+ return frames[frameIndex];
+ }
- public void _debug_decLookupCount() {
- _debug_total_lookup_counts--;
- }
+ public String debugGetAvgSearchPath() {
+ double avg = (((double) debugTotalLookupSteps) / ((double) debugTotalLookupCounts));
+ return "\nTotal allocation requests:\t" + debugTotalLookupCounts + "\nAvg Allocation Path Length:\t" + avg
+ + "\nMax BST Depth:\t" + debugMaxDepth;
+ }
- /**
- *
- * @param p_r
- * is the container passed by the caller to contain the results
- * @throws HyracksDataException
- */
- private void addFrame(Slot[] p_r) throws HyracksDataException {
- _debug_depth_counter = 0;
- clear(p_r);
- if ((lastFrame + 1) >= frames.length) {
- return;
- }
- frames[++lastFrame] = allocateFrame();
- int l = FRAME_SIZE - 2 * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
- initNewNode(lastFrame, 0);
+ public void debugDecLookupCount() {
+ debugTotalLookupCounts--;
+ }
- p_r[1].copy(root);
- if (p_r[1].isNull()) { // root is null
- root.set(lastFrame, 0);
- initNewNode(root.getFrameIx(), root.getOffset());
- p_r[1].copy(root);
- return;
- }
+ /**
+ *
+ * @param parentResult
+ * is the container passed by the caller to contain the results
+ * @throws HyracksDataException
+ */
+ private void addFrame(Slot[] parentResult) throws HyracksDataException {
+ debugDepthCounter = 0;
+ clear(parentResult);
+ if ((lastFrame + 1) >= frames.length) {
+ return;
+ }
+ frames[++lastFrame] = allocateFrame();
+ int l = frameSize - 2 * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
+ initNewNode(lastFrame, 0);
- while (!p_r[1].isNull()) {
- _debug_depth_counter++;
- if (BSTNodeUtil.getLength(p_r[1], frames, convertBuffer) == l) {
- append(p_r[1].getFrameIx(), p_r[1].getOffset(), lastFrame, 0);
- p_r[1].set(lastFrame, 0);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- }
- if (l < BSTNodeUtil.getLength(p_r[1], frames, convertBuffer)) {
- if (isNodeNull(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
- convertBuffer), BSTNodeUtil.leftChild_offset(p_r[1],
- frames, convertBuffer))) {
- BSTNodeUtil.setLeftChild(p_r[1].getFrameIx(),
- p_r[1].getOffset(), lastFrame, 0, frames);
- p_r[0].copy(p_r[1]);
- p_r[1].set(lastFrame, 0);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- } else {
- p_r[0].copy(p_r[1]);
- p_r[1].set(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
- convertBuffer), BSTNodeUtil.leftChild_offset(
- p_r[1], frames, convertBuffer));
- }
- } else {
- if (isNodeNull(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
- convertBuffer), BSTNodeUtil.rightChild_offset(p_r[1],
- frames, convertBuffer))) {
- BSTNodeUtil.setRightChild(p_r[1].getFrameIx(),
- p_r[1].getOffset(), lastFrame, 0, frames);
- p_r[0].copy(p_r[1]);
- p_r[1].set(lastFrame, 0);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- } else {
- p_r[0].copy(p_r[1]);
- p_r[1].set(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
- convertBuffer), BSTNodeUtil.rightChild_offset(
- p_r[1], frames, convertBuffer));
- }
- }
- }
- throw new HyracksDataException(
- "New Frame could not be added to BSTMemMgr");
- }
+ parentResult[1].copy(root);
+ if (parentResult[1].isNull()) { // root is null
+ root.set(lastFrame, 0);
+ initNewNode(root.getFrameIx(), root.getOffset());
+ parentResult[1].copy(root);
+ return;
+ }
- private void insert(int fix, int off, int length)
- throws HyracksDataException {
- _debug_depth_counter = 0;
- BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
- initNewNode(fix, off);
+ while (!parentResult[1].isNull()) {
+ debugDepthCounter++;
+ if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
+ append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
+ parentResult[1].set(lastFrame, 0);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ }
+ if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
+ if (isNodeNull(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
+ BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer))) {
+ BSTNodeUtil.setLeftChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
+ frames);
+ parentResult[0].copy(parentResult[1]);
+ parentResult[1].set(lastFrame, 0);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ } else {
+ parentResult[0].copy(parentResult[1]);
+ parentResult[1].set(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
+ BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer));
+ }
+ } else {
+ if (isNodeNull(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
+ BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer))) {
+ BSTNodeUtil.setRightChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
+ frames);
+ parentResult[0].copy(parentResult[1]);
+ parentResult[1].set(lastFrame, 0);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ } else {
+ parentResult[0].copy(parentResult[1]);
+ parentResult[1].set(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
+ BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer));
+ }
+ }
+ }
+ throw new HyracksDataException("New Frame could not be added to BSTMemMgr");
+ }
- if (root.isNull()) {
- root.set(fix, off);
- return;
- }
+ private void insert(int fix, int off, int length) throws HyracksDataException {
+ debugDepthCounter = 0;
+ BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
+ initNewNode(fix, off);
- insertSlot.clear();
- insertSlot.copy(root);
- while (!insertSlot.isNull()) {
- _debug_depth_counter++;
- int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames,
- convertBuffer);
- if (curSlotLen == length) {
- append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix,
- off);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- }
- if (length < curSlotLen) {
- int lc_fix = BSTNodeUtil.leftChild_fIx(insertSlot, frames,
- convertBuffer);
- int lc_off = BSTNodeUtil.leftChild_offset(insertSlot, frames,
- convertBuffer);
- if (isNodeNull(lc_fix, lc_off)) {
- initNewNode(fix, off);
- BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(),
- insertSlot.getOffset(), fix, off, frames);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- } else {
- insertSlot.set(lc_fix, lc_off);
- }
- } else {
- int rc_fix = BSTNodeUtil.rightChild_fIx(insertSlot, frames,
- convertBuffer);
- int rc_off = BSTNodeUtil.rightChild_offset(insertSlot, frames,
- convertBuffer);
- if (isNodeNull(rc_fix, rc_off)) {
- initNewNode(fix, off);
- BSTNodeUtil.setRightChild(insertSlot.getFrameIx(),
- insertSlot.getOffset(), fix, off, frames);
- if (_debug_depth_counter > _debug_max_depth) {
- _debug_max_depth = _debug_depth_counter;
- }
- return;
- } else {
- insertSlot.set(rc_fix, rc_off);
- }
- }
- }
- throw new HyracksDataException(
- "Failure in node insertion into BST in BSTMemMgr");
- }
+ if (root.isNull()) {
+ root.set(fix, off);
+ return;
+ }
- /**
- * @param length
- * @param target
- * is the container sent by the caller to hold the results
- */
- private void search(int length, Slot[] target) {
- clear(target);
- result.clear();
+ insertSlot.clear();
+ insertSlot.copy(root);
+ while (!insertSlot.isNull()) {
+ debugDepthCounter++;
+ int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
+ if (curSlotLen == length) {
+ append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ }
+ if (length < curSlotLen) {
+ int leftChildFIx = BSTNodeUtil.getLeftChildFrameIx(insertSlot, frames, convertBuffer);
+ int leftChildOffset = BSTNodeUtil.getLeftChildOffset(insertSlot, frames, convertBuffer);
+ if (isNodeNull(leftChildFIx, leftChildOffset)) {
+ initNewNode(fix, off);
+ BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ } else {
+ insertSlot.set(leftChildFIx, leftChildOffset);
+ }
+ } else {
+ int rightChildFIx = BSTNodeUtil.getRightChildFrameIx(insertSlot, frames, convertBuffer);
+ int rightChildOffset = BSTNodeUtil.getRightChildOffset(insertSlot, frames, convertBuffer);
+ if (isNodeNull(rightChildFIx, rightChildOffset)) {
+ initNewNode(fix, off);
+ BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
+ if (debugDepthCounter > debugMaxDepth) {
+ debugMaxDepth = debugDepthCounter;
+ }
+ return;
+ } else {
+ insertSlot.set(rightChildFIx, rightChildOffset);
+ }
+ }
+ }
+ throw new HyracksDataException("Failure in node insertion into BST in BSTMemMgr");
+ }
- if (root.isNull()) {
- return;
- }
+ /**
+ * @param length
+ * @param target
+ * is the container sent by the caller to hold the results
+ */
+ private void search(int length, Slot[] target) {
+ clear(target);
+ result.clear();
- Slot lastLeftParent = new Slot();
- Slot lastLeft = new Slot();
- Slot parent = new Slot();
- result.copy(root);
+ if (root.isNull()) {
+ return;
+ }
- while (!result.isNull()) {
- _debug_total_lookup_steps++;
- if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
- target[0].copy(parent);
- target[1].copy(result);
- return;
- }
- if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
- lastLeftParent.copy(parent);
- lastLeft.copy(result);
- parent.copy(result);
- int fix = BSTNodeUtil.leftChild_fIx(result, frames,
- convertBuffer);
- int off = BSTNodeUtil.leftChild_offset(result, frames,
- convertBuffer);
- result.set(fix, off);
- } else {
- parent.copy(result);
- int fix = BSTNodeUtil.rightChild_fIx(result, frames,
- convertBuffer);
- int off = BSTNodeUtil.rightChild_offset(result, frames,
- convertBuffer);
- result.set(fix, off);
- }
- }
+ Slot lastLeftParent = new Slot();
+ Slot lastLeft = new Slot();
+ Slot parent = new Slot();
+ result.copy(root);
- target[0].copy(lastLeftParent);
- target[1].copy(lastLeft);
+ while (!result.isNull()) {
+ debugTotalLookupSteps++;
+ if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
+ target[0].copy(parent);
+ target[1].copy(result);
+ return;
+ }
+ if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
+ lastLeftParent.copy(parent);
+ lastLeft.copy(result);
+ parent.copy(result);
+ int fix = BSTNodeUtil.getLeftChildFrameIx(result, frames, convertBuffer);
+ int off = BSTNodeUtil.getLeftChildOffset(result, frames, convertBuffer);
+ result.set(fix, off);
+ } else {
+ parent.copy(result);
+ int fix = BSTNodeUtil.getRightChildFrameIx(result, frames, convertBuffer);
+ int off = BSTNodeUtil.getRightChildOffset(result, frames, convertBuffer);
+ result.set(fix, off);
+ }
+ }
- }
+ target[0].copy(lastLeftParent);
+ target[1].copy(lastLeft);
- private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
- initNewNode(nodeFix, nodeOff);
+ }
- int fix = BSTNodeUtil.next_fIx(headFix, headOff, frames, convertBuffer); // frameIx
- // for
- // the
- // current
- // next
- // of
- // head
- int off = BSTNodeUtil.next_offset(headFix, headOff, frames,
- convertBuffer); // offset for the current next of head
- BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
+ private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
+ initNewNode(nodeFix, nodeOff);
- if (!isNodeNull(fix, off)) {
- BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
- }
- BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
- BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
- }
+ int fix = BSTNodeUtil.getNextFrameIx(headFix, headOff, frames, convertBuffer); // frameIx
+ // for
+ // the
+ // current
+ // next
+ // of
+ // head
+ int off = BSTNodeUtil.getNextOffset(headFix, headOff, frames, convertBuffer); // offset
+ // for
+ // the
+ // current
+ // next
+ // of
+ // head
+ BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
- private int[] split(Slot listHead, Slot parent, int length) {
- int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer)
- - length - 2 * BSTNodeUtil.HEADER_SIZE;
- // We split the node after slots-list head
- if (!isNodeNull(BSTNodeUtil.next_fIx(listHead, frames, convertBuffer),
- BSTNodeUtil.next_offset(listHead, frames, convertBuffer))) {
- int afterHeadFix = BSTNodeUtil.next_fIx(listHead, frames,
- convertBuffer);
- int afterHeadOff = BSTNodeUtil.next_offset(listHead, frames,
- convertBuffer);
- int afHNextFix = BSTNodeUtil.next_fIx(afterHeadFix, afterHeadOff,
- frames, convertBuffer);
- int afHNextOff = BSTNodeUtil.next_offset(afterHeadFix,
- afterHeadOff, frames, convertBuffer);
- BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(),
- afHNextFix, afHNextOff, frames);
- if (!isNodeNull(afHNextFix, afHNextOff)) {
- BSTNodeUtil.setPrev(afHNextFix, afHNextOff,
- listHead.getFrameIx(), listHead.getOffset(), frames);
- }
- int secondOffset = afterHeadOff + length + 2
- * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length,
- true, frames);
- BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true,
- frames);
+ if (!isNodeNull(fix, off)) {
+ BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
+ }
+ BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
+ BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
+ }
- return new int[] { afterHeadFix, afterHeadOff, afterHeadFix,
- secondOffset };
- }
- // We split the head
- int secondOffset = listHead.getOffset() + length + 2
- * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(),
- listHead.getOffset(), length, true, frames);
- BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2,
- true, frames);
+ private int[] split(Slot listHead, Slot parent, int length) {
+ int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer) - length - 2 * BSTNodeUtil.HEADER_SIZE;
+ // We split the node after slots-list head
+ if (!isNodeNull(BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer),
+ BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer))) {
+ int afterHeadFix = BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer);
+ int afterHeadOff = BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer);
+ int afHNextFix = BSTNodeUtil.getNextFrameIx(afterHeadFix, afterHeadOff, frames, convertBuffer);
+ int afHNextOff = BSTNodeUtil.getNextOffset(afterHeadFix, afterHeadOff, frames, convertBuffer);
+ BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(), afHNextFix, afHNextOff, frames);
+ if (!isNodeNull(afHNextFix, afHNextOff)) {
+ BSTNodeUtil.setPrev(afHNextFix, afHNextOff, listHead.getFrameIx(), listHead.getOffset(), frames);
+ }
+ int secondOffset = afterHeadOff + length + 2 * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length, true, frames);
+ BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true, frames);
- fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(),
- parent.getFrameIx(), parent.getOffset());
- return new int[] { listHead.getFrameIx(), listHead.getOffset(),
- listHead.getFrameIx(), secondOffset };
- }
+ return new int[] { afterHeadFix, afterHeadOff, afterHeadFix, secondOffset };
+ }
+ // We split the head
+ int secondOffset = listHead.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), listHead.getOffset(), length, true, frames);
+ BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2, true, frames);
- private void fixTreePtrs(int node_fix, int node_off, int par_fix,
- int par_off) {
- int nlc_fix = BSTNodeUtil.leftChild_fIx(node_fix, node_off, frames,
- convertBuffer);
- int nlc_off = BSTNodeUtil.leftChild_offset(node_fix, node_off, frames,
- convertBuffer);
- int nrc_fix = BSTNodeUtil.rightChild_fIx(node_fix, node_off, frames,
- convertBuffer);
- int nrc_off = BSTNodeUtil.rightChild_offset(node_fix, node_off, frames,
- convertBuffer);
+ fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(), parent.getFrameIx(), parent.getOffset());
+ return new int[] { listHead.getFrameIx(), listHead.getOffset(), listHead.getFrameIx(), secondOffset };
+ }
- int status = -1; // (status==0 if node is left child of parent)
- // (status==1 if node is right child of parent)
- if (!isNodeNull(par_fix, par_off)) {
- int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
- node_fix, node_off, frames, convertBuffer));
- int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
- par_fix, par_off, frames, convertBuffer));
- status = ((nlen < plen) ? 0 : 1);
- }
+ private void fixTreePtrs(int nodeFrameIx, int nodeOffset, int parentFrameIx, int parentOffset) {
+ int nodeLeftChildFrameIx = BSTNodeUtil.getLeftChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
+ int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
+ int nodeRightChildFrameIx = BSTNodeUtil.getRightChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
+ int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
- if (!isNodeNull(nlc_fix, nlc_off) && !isNodeNull(nrc_fix, nrc_off)) { // Node
- // has
- // two
- // children
- int pmin_fix = node_fix;
- int pmin_off = node_off;
- int min_fix = nrc_fix;
- int min_off = nrc_off;
- int next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
- frames, convertBuffer);
- int next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
- frames, convertBuffer);
+ int status = -1; // (status==0 if node is left child of parent)
+ // (status==1 if node is right child of parent)
+ if (!isNodeNull(parentFrameIx, parentOffset)) {
+ int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil
+ .getLength(nodeFrameIx, nodeOffset, frames, convertBuffer));
+ int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(parentFrameIx, parentOffset, frames,
+ convertBuffer));
+ status = ((nlen < plen) ? 0 : 1);
+ }
- while (!isNodeNull(next_left_fix, next_left_off)) {
- pmin_fix = min_fix;
- pmin_off = min_off;
- min_fix = next_left_fix;
- min_off = next_left_off;
- next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
- frames, convertBuffer); // min is now pointing to
- // current (old) next left
- next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
- frames, convertBuffer); // min is now pointing to
- // current (old) next left
- }
+ if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)
+ && !isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
+ // has
+ // two
+ // children
+ int pMinFIx = nodeFrameIx;
+ int pMinOff = nodeOffset;
+ int minFIx = nodeRightChildFrameIx;
+ int minOff = nodeRightChildOffset;
+ int nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer);
+ int nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer);
- if ((nrc_fix == min_fix) && (nrc_off == min_off)) { // nrc is the
- // same as min
- BSTNodeUtil.setLeftChild(nrc_fix, nrc_off, nlc_fix, nlc_off,
- frames);
- } else { // min is different from nrc
- int min_right_fix = BSTNodeUtil.rightChild_fIx(min_fix,
- min_off, frames, convertBuffer);
- int min_right_off = BSTNodeUtil.rightChild_offset(min_fix,
- min_off, frames, convertBuffer);
- BSTNodeUtil.setRightChild(min_fix, min_off, nrc_fix, nrc_off,
- frames);
- BSTNodeUtil.setLeftChild(min_fix, min_off, nlc_fix, nlc_off,
- frames);
- BSTNodeUtil.setLeftChild(pmin_fix, pmin_off, min_right_fix,
- min_right_off, frames);
- }
+ while (!isNodeNull(nextLeftFIx, nextLeftOff)) {
+ pMinFIx = minFIx;
+ pMinOff = minOff;
+ minFIx = nextLeftFIx;
+ minOff = nextLeftOff;
+ nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer); // min
+ // is
+ // now
+ // pointing
+ // to
+ // current
+ // (old)
+ // next
+ // left
+ nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer); // min
+ // is
+ // now
+ // pointing
+ // to
+ // current
+ // (old)
+ // next
+ // left
+ }
- // Now dealing with the parent
- if (!isNodeNull(par_fix, par_off)) {
- if (status == 0) {
- BSTNodeUtil.setLeftChild(par_fix, par_off, min_fix,
- min_off, frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(par_fix, par_off, min_fix,
- min_off, frames);
- }
- } else { // No parent (node was the root)
- root.set(min_fix, min_off);
- }
- return;
- }
+ if ((nodeRightChildFrameIx == minFIx) && (nodeRightChildOffset == minOff)) { // nrc
+ // is
+ // the
+ // same as min
+ BSTNodeUtil.setLeftChild(nodeRightChildFrameIx, nodeRightChildOffset, nodeLeftChildFrameIx,
+ nodeLeftChildOffset, frames);
+ } else { // min is different from nrc
+ int minRightFIx = BSTNodeUtil.getRightChildFrameIx(minFIx, minOff, frames, convertBuffer);
+ int minRightOffset = BSTNodeUtil.getRightChildOffset(minFIx, minOff, frames, convertBuffer);
+ BSTNodeUtil.setRightChild(minFIx, minOff, nodeRightChildFrameIx, nodeRightChildOffset, frames);
+ BSTNodeUtil.setLeftChild(minFIx, minOff, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
+ BSTNodeUtil.setLeftChild(pMinFIx, pMinOff, minRightFIx, minRightOffset, frames);
+ }
- else if (!isNodeNull(nlc_fix, nlc_off)) { // Node has only left child
- if (status == 0) {
- BSTNodeUtil.setLeftChild(par_fix, par_off, nlc_fix, nlc_off,
- frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(par_fix, par_off, nlc_fix, nlc_off,
- frames);
- } else if (status == -1) { // No parent, so node is root
- root.set(nlc_fix, nlc_off);
- }
- return;
- }
+ // Now dealing with the parent
+ if (!isNodeNull(parentFrameIx, parentOffset)) {
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
+ }
+ } else { // No parent (node was the root)
+ root.set(minFIx, minOff);
+ }
+ return;
+ }
- else if (!isNodeNull(nrc_fix, nrc_off)) { // Node has only right child
- if (status == 0) {
- BSTNodeUtil.setLeftChild(par_fix, par_off, nrc_fix, nrc_off,
- frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(par_fix, par_off, nrc_fix, nrc_off,
- frames);
- } else if (status == -1) { // No parent, so node is root
- root.set(nrc_fix, nrc_off);
- }
- return;
- }
+ else if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)) { // Node
+ // has
+ // only
+ // left
+ // child
+ if (status == 0) {
+ BSTNodeUtil
+ .setLeftChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset,
+ frames);
+ } else if (status == -1) { // No parent, so node is root
+ root.set(nodeLeftChildFrameIx, nodeLeftChildOffset);
+ }
+ return;
+ }
- else { // Node is leaf (no children)
- if (status == 0) {
- BSTNodeUtil.setLeftChild(par_fix, par_off,
- BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
- frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(par_fix, par_off,
- BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
- frames);
- } else { // node was the only node in the tree
- root.clear();
- }
- return;
- }
- }
+ else if (!isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
+ // has
+ // only
+ // right
+ // child
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
+ frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
+ frames);
+ } else if (status == -1) { // No parent, so node is root
+ root.set(nodeRightChildFrameIx, nodeRightChildOffset);
+ }
+ return;
+ }
- /**
- * Allocation with no splitting but padding
- *
- * @param node
- * @param parent
- * @param result
- * is the container sent by the caller to hold the results
- */
- private void allocate(Slot node, Slot parent, int length, Slot result) {
- int nextFix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
- int nextOff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
- if (!isNodeNull(nextFix, nextOff)) {
- int nextOfNext_fix = BSTNodeUtil.next_fIx(nextFix, nextOff, frames,
- convertBuffer);
- int nextOfNext_off = BSTNodeUtil.next_offset(nextFix, nextOff,
- frames, convertBuffer);
- BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(),
- nextOfNext_fix, nextOfNext_off, frames);
- if (!isNodeNull(nextOfNext_fix, nextOfNext_off)) {
- BSTNodeUtil.setPrev(nextOfNext_fix, nextOfNext_off,
- node.getFrameIx(), node.getOffset(), frames);
- }
- BSTNodeUtil
- .setHeaderFooter(nextFix, nextOff, length, false, frames);
- result.set(nextFix, nextOff);
- return;
- }
+ else { // Node is leaf (no children)
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ } else { // node was the only node in the tree
+ root.clear();
+ }
+ return;
+ }
+ }
- fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(),
- parent.getOffset());
- BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(),
- length, false, frames);
- result.copy(node);
- }
+ /**
+ * Allocation with no splitting but padding
+ *
+ * @param node
+ * @param parent
+ * @param result
+ * is the container sent by the caller to hold the results
+ */
+ private void allocate(Slot node, Slot parent, int length, Slot result) {
+ int nextFix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
+ int nextOff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
+ if (!isNodeNull(nextFix, nextOff)) {
+ int nextOfNextFIx = BSTNodeUtil.getNextFrameIx(nextFix, nextOff, frames, convertBuffer);
+ int nextOfNextOffset = BSTNodeUtil.getNextOffset(nextFix, nextOff, frames, convertBuffer);
+ BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(), nextOfNextFIx, nextOfNextOffset, frames);
+ if (!isNodeNull(nextOfNextFIx, nextOfNextOffset)) {
+ BSTNodeUtil.setPrev(nextOfNextFIx, nextOfNextOffset, node.getFrameIx(), node.getOffset(), frames);
+ }
+ BSTNodeUtil.setHeaderFooter(nextFix, nextOff, length, false, frames);
+ result.set(nextFix, nextOff);
+ return;
+ }
- private void removeFromList(int fix, int off) {
- int nx_fix = BSTNodeUtil.next_fIx(fix, off, frames, convertBuffer);
- int nx_off = BSTNodeUtil.next_offset(fix, off, frames, convertBuffer);
- int prev_fix = BSTNodeUtil.prev_fIx(fix, off, frames, convertBuffer);
- int prev_off = BSTNodeUtil.prev_offset(fix, off, frames, convertBuffer);
- if (!isNodeNull(prev_fix, prev_off) && !isNodeNull(nx_fix, nx_off)) {
- BSTNodeUtil.setNext(prev_fix, prev_off, nx_fix, nx_off, frames);
- BSTNodeUtil.setPrev(nx_fix, nx_off, prev_fix, prev_off, frames);
- BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- return;
- }
- if (!isNodeNull(prev_fix, prev_off)) {
- BSTNodeUtil.setNext(prev_fix, prev_off, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- return;
- }
+ fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(), parent.getOffset());
+ BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(), length, false, frames);
+ result.copy(node);
+ }
- // We need to find the parent, so we can fix the tree
- int par_fix = BSTNodeUtil.INVALID_INDEX;
- int par_off = BSTNodeUtil.INVALID_INDEX;
- int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix,
- off, frames, convertBuffer));
- fix = root.getFrameIx();
- off = root.getOffset();
- int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- while (length != curLen) {
- par_fix = fix;
- par_off = off;
- if (length < curLen) {
- fix = BSTNodeUtil.leftChild_fIx(par_fix, par_off, frames,
- convertBuffer); // par_fix is now the old(current) fix
- off = BSTNodeUtil.leftChild_offset(par_fix, par_off, frames,
- convertBuffer); // par_off is now the old(current) off
- } else {
- fix = BSTNodeUtil.rightChild_fIx(par_fix, par_off, frames,
- convertBuffer); // par_fix is now the old(current) fix
- off = BSTNodeUtil.rightChild_offset(par_fix, par_off, frames,
- convertBuffer); // par_off is now the old(current) off
- }
- curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- }
+ private void removeFromList(int fix, int off) {
+ int nextFIx = BSTNodeUtil.getNextFrameIx(fix, off, frames, convertBuffer);
+ int nextOffset = BSTNodeUtil.getNextOffset(fix, off, frames, convertBuffer);
+ int prevFIx = BSTNodeUtil.getPrevFrameIx(fix, off, frames, convertBuffer);
+ int prevOffset = BSTNodeUtil.getPrevOffset(fix, off, frames, convertBuffer);
+ if (!isNodeNull(prevFIx, prevOffset) && !isNodeNull(nextFIx, nextOffset)) {
+ BSTNodeUtil.setNext(prevFIx, prevOffset, nextFIx, nextOffset, frames);
+ BSTNodeUtil.setPrev(nextFIx, nextOffset, prevFIx, prevOffset, frames);
+ BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ return;
+ }
+ if (!isNodeNull(prevFIx, prevOffset)) {
+ BSTNodeUtil.setNext(prevFIx, prevOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ return;
+ }
- if (!isNodeNull(nx_fix, nx_off)) { // it is head of the list (in the
- // tree)
- BSTNodeUtil.setPrev(nx_fix, nx_off, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- int node_lc_fix = BSTNodeUtil.leftChild_fIx(fix, off, frames,
- convertBuffer);
- int node_lc_off = BSTNodeUtil.leftChild_offset(fix, off, frames,
- convertBuffer);
- int node_rc_fix = BSTNodeUtil.rightChild_fIx(fix, off, frames,
- convertBuffer);
- int node_rc_off = BSTNodeUtil.rightChild_offset(fix, off, frames,
- convertBuffer);
- BSTNodeUtil.setLeftChild(nx_fix, nx_off, node_lc_fix, node_lc_off,
- frames);
- BSTNodeUtil.setRightChild(nx_fix, nx_off, node_rc_fix, node_rc_off,
- frames);
- if (!isNodeNull(par_fix, par_off)) {
- int parentLength = BSTNodeUtil.getLength(par_fix, par_off,
- frames, convertBuffer);
- if (length < parentLength) {
- BSTNodeUtil.setLeftChild(par_fix, par_off, nx_fix, nx_off,
- frames);
- } else {
- BSTNodeUtil.setRightChild(par_fix, par_off, nx_fix, nx_off,
- frames);
- }
- }
+ // We need to find the parent, so we can fix the tree
+ int parentFIx = BSTNodeUtil.INVALID_INDEX;
+ int parentOffset = BSTNodeUtil.INVALID_INDEX;
+ int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix, off, frames, convertBuffer));
+ fix = root.getFrameIx();
+ off = root.getOffset();
+ int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ while (length != curLen) {
+ parentFIx = fix;
+ parentOffset = off;
+ if (length < curLen) {
+ fix = BSTNodeUtil.getLeftChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
+ // is
+ // now
+ // the
+ // old(current)
+ // fix
+ off = BSTNodeUtil.getLeftChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
+ // is
+ // now
+ // the
+ // old(current)
+ // off
+ } else {
+ fix = BSTNodeUtil.getRightChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
+ // is
+ // now
+ // the
+ // old(current)
+ // fix
+ off = BSTNodeUtil.getRightChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
+ // is
+ // now
+ // the
+ // old(current)
+ // off
+ }
+ curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ }
- if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
- root.set(nx_fix, nx_off);
- }
+ if (!isNodeNull(nextFIx, nextOffset)) { // it is head of the list (in
+ // the
+ // tree)
+ BSTNodeUtil.setPrev(nextFIx, nextOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ int nodeLeftChildFIx = BSTNodeUtil.getLeftChildFrameIx(fix, off, frames, convertBuffer);
+ int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(fix, off, frames, convertBuffer);
+ int nodeRightChildFix = BSTNodeUtil.getRightChildFrameIx(fix, off, frames, convertBuffer);
+ int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(fix, off, frames, convertBuffer);
+ BSTNodeUtil.setLeftChild(nextFIx, nextOffset, nodeLeftChildFIx, nodeLeftChildOffset, frames);
+ BSTNodeUtil.setRightChild(nextFIx, nextOffset, nodeRightChildFix, nodeRightChildOffset, frames);
+ if (!isNodeNull(parentFIx, parentOffset)) {
+ int parentLength = BSTNodeUtil.getLength(parentFIx, parentOffset, frames, convertBuffer);
+ if (length < parentLength) {
+ BSTNodeUtil.setLeftChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
+ } else {
+ BSTNodeUtil.setRightChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
+ }
+ }
- return;
- }
+ if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
+ root.set(nextFIx, nextOffset);
+ }
- fixTreePtrs(fix, off, par_fix, par_off);
- }
+ return;
+ }
- private void clear(Slot[] s) {
- s[0].clear();
- s[1].clear();
- }
+ fixTreePtrs(fix, off, parentFIx, parentOffset);
+ }
- private boolean isNodeNull(int frameIx, int offset) {
- return ((frameIx == BSTNodeUtil.INVALID_INDEX)
- || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
- }
+ private void clear(Slot[] s) {
+ s[0].clear();
+ s[1].clear();
+ }
- private boolean shouldSplit(int slotLength, int reqLength) {
- return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
- }
+ private boolean isNodeNull(int frameIx, int offset) {
+ return ((frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
+ }
- private void initNewNode(int frameIx, int offset) {
- BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- }
+ private boolean shouldSplit(int slotLength, int reqLength) {
+ return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
+ }
- private ByteBuffer allocateFrame() {
- return ctx.allocateFrame();
- }
+ private void initNewNode(int frameIx, int offset) {
+ BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+ }
- public String _debug_printMemory() {
- _debug_free_slots = 0;
- Slot s = new Slot(0, 0);
- if (s.isNull()) {
- return "memory:\tNull";
- }
+ private ByteBuffer allocateFrame() {
+ return ctx.allocateFrame();
+ }
- if (BSTNodeUtil.isFree(0, 0, frames)) {
- _debug_free_slots++;
- }
+ public String debugPrintMemory() {
+ debugFreeSlots = 0;
+ Slot s = new Slot(0, 0);
+ if (s.isNull()) {
+ return "memory:\tNull";
+ }
- String m = "memory:\n" + _debug_printSlot(0, 0) + "\n";
- int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0,
- frames, convertBuffer));
- int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
- : length + 2 * BSTNodeUtil.HEADER_SIZE);
- int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX
- : 1)
- : 0);
- if (noff == BSTNodeUtil.INVALID_INDEX
- && nfix != BSTNodeUtil.INVALID_INDEX) {
- noff = 0;
- }
- s.set(nfix, noff);
- while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
- if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
- _debug_free_slots++;
- }
- m += _debug_printSlot(s.getFrameIx(), s.getOffset()) + "\n";
- length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
- s.getFrameIx(), s.getOffset(), frames, convertBuffer));
- noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
- : s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
- nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s
- .getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
- : s.getFrameIx() + 1) : s.getFrameIx());
- if (noff == BSTNodeUtil.INVALID_INDEX
- && nfix != BSTNodeUtil.INVALID_INDEX) {
- noff = 0;
- }
- s.set(nfix, noff);
- }
- return m + "\nFree Slots:\t" + _debug_free_slots;
- }
+ if (BSTNodeUtil.isFree(0, 0, frames)) {
+ debugFreeSlots++;
+ }
- public String _debug_printTree() {
- _debug_tree_size = 0;
- Slot node = new Slot();
- node.copy(root);
- if (!node.isNull()) {
- _debug_tree_size++;
- return _debug_printSubTree(node) + "\nTree Nodes:\t"
- + _debug_tree_size;
- }
- return "Null";
- }
+ String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
+ int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
+ int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
+ * BSTNodeUtil.HEADER_SIZE);
+ int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX : 1) : 0);
+ if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
+ noff = 0;
+ }
+ s.set(nfix, noff);
+ while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
+ if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
+ debugFreeSlots++;
+ }
+ m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
+ length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
+ convertBuffer));
+ noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : s
+ .getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
+ nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
+ : s.getFrameIx() + 1)
+ : s.getFrameIx());
+ if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
+ noff = 0;
+ }
+ s.set(nfix, noff);
+ }
+ return m + "\nFree Slots:\t" + debugFreeSlots;
+ }
- private String _debug_printSubTree(Slot r) {
- Slot node = new Slot();
- node.copy(r);
- int fix = node.getFrameIx();
- int off = node.getOffset();
- int lfix = BSTNodeUtil.leftChild_fIx(node, frames, convertBuffer);
- int loff = BSTNodeUtil.leftChild_offset(node, frames, convertBuffer);
- int rfix = BSTNodeUtil.rightChild_fIx(node, frames, convertBuffer);
- int roff = BSTNodeUtil.rightChild_offset(node, frames, convertBuffer);
- int nfix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
- int noff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
- int pfix = BSTNodeUtil.prev_fIx(node, frames, convertBuffer);
- int poff = BSTNodeUtil.prev_offset(node, frames, convertBuffer);
+ public String debugPrintTree() {
+ debugTreeSize = 0;
+ Slot node = new Slot();
+ node.copy(root);
+ if (!node.isNull()) {
+ debugTreeSize++;
+ return debugPrintSubTree(node) + "\nTree Nodes:\t" + debugTreeSize;
+ }
+ return "Null";
+ }
- String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
- + BSTNodeUtil.getLength(fix, off, frames, convertBuffer)
- + ") - " + "(LC: " + _debug_printSlot(lfix, loff) + ") - "
- + "(RC: " + _debug_printSlot(rfix, roff) + ") - " + "(NX: "
- + _debug_printSlot(nfix, noff) + ") - " + "(PR: "
- + _debug_printSlot(pfix, poff) + ") }\n";
- if (!isNodeNull(lfix, loff)) {
- _debug_tree_size++;
- s += _debug_printSubTree(new Slot(lfix, loff)) + "\n";
- }
- if (!isNodeNull(rfix, roff)) {
- _debug_tree_size++;
- s += _debug_printSubTree(new Slot(rfix, roff)) + "\n";
- }
+ private String debugPrintSubTree(Slot r) {
+ Slot node = new Slot();
+ node.copy(r);
+ int fix = node.getFrameIx();
+ int off = node.getOffset();
+ int lfix = BSTNodeUtil.getLeftChildFrameIx(node, frames, convertBuffer);
+ int loff = BSTNodeUtil.getLeftChildOffset(node, frames, convertBuffer);
+ int rfix = BSTNodeUtil.getRightChildFrameIx(node, frames, convertBuffer);
+ int roff = BSTNodeUtil.getRightChildOffset(node, frames, convertBuffer);
+ int nfix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
+ int noff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
+ int pfix = BSTNodeUtil.getPrevFrameIx(node, frames, convertBuffer);
+ int poff = BSTNodeUtil.getPrevOffset(node, frames, convertBuffer);
- return s;
- }
+ String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
+ + BSTNodeUtil.getLength(fix, off, frames, convertBuffer) + ") - " + "(LC: "
+ + debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
+ + debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ") }\n";
+ if (!isNodeNull(lfix, loff)) {
+ debugTreeSize++;
+ s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
+ }
+ if (!isNodeNull(rfix, roff)) {
+ debugTreeSize++;
+ s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
+ }
- private String _debug_printSlot(int fix, int off) {
- if (isNodeNull(fix, off)) {
- return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
- }
- int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- int al = BSTNodeUtil.getActualLength(l);
- boolean f = BSTNodeUtil.isFree(fix, off, frames);
- return fix + ", " + off + " (free: " + f + ") (Len: " + l
- + ") (actual len: " + al + ") ";
- }
+ return s;
+ }
+
+ private String debugPrintSlot(int fix, int off) {
+ if (isNodeNull(fix, off)) {
+ return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
+ }
+ int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ int al = BSTNodeUtil.getActualLength(l);
+ boolean f = BSTNodeUtil.isFree(fix, off, frames);
+ return fix + ", " + off + " (free: " + f + ") (Len: " + l + ") (actual len: " + al + ") ";
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
index dc1bffa..0294dc0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -8,270 +8,214 @@
* Implements utility methods, used extensively and repeatedly within
* the BSTMemMgr.
*
- * Main functionality includes methods to set/get different types of
- * pointers, required and accessed within BST traversal, along with the
- * methods for setting/getting length/header/footer of free slots, which
- * have been used as the containers for BST nodes.
+ * Mainly includes methods to set/get different types of pointers,
+ * required and accessed within BST traversal, along with the methods
+ * for setting/getting length/header/footer of free slots, which have
+ * been used as the containers for BST nodes.
*/
public class BSTNodeUtil {
- static final int TOTAL_FRAME_SIZE = BSTMemMgr.FRAME_SIZE;
- static final int MINIMUM_FREE_SLOT_SIZE = 32;
+ static final int MINIMUM_FREE_SLOT_SIZE = 32;
- static final int FRAME_PTR_SIZE = 4;
- static final int OFFSET_SIZE = 2;
+ private static final int FRAME_PTR_SIZE = 4;
+ private static final int OFFSET_SIZE = 2;
- static final int HEADER_SIZE = 2;
- static final int HEADER_INDEX = 0;
+ static final int HEADER_SIZE = 2;
+ private static final int HEADER_INDEX = 0;
- static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
- static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX
- + FRAME_PTR_SIZE;
+ private static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
+ private static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
- static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX
- + OFFSET_SIZE;
- static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX
- + FRAME_PTR_SIZE;
+ private static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+ private static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
- static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
- static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
+ private static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+ private static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
- static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
- static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
+ private static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
+ private static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
- private static final byte INVALID = -128;
- private static final byte MASK = 127;
- static final int INVALID_INDEX = -1;
+ private static final byte INVALID = -128;
+ private static final byte MASK = 127;
+ static final int INVALID_INDEX = -1;
- /*
- * Structure of a free slot:
- * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
- * HEADER is set to 1 (invalid content)
- *
- * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
- * 0 (valid content)
- */
+ /*
+ * Structure of a free slot:
+ * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
+ * HEADER is set to 1 in a free slot
+ *
+ * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
+ * 0 in a used slot
+ */
- static int leftChild_fIx(Slot s, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return leftChild_fIx(s.getFrameIx(), s.getOffset(), frames,
- convertBuffer);
- }
+ static int getLeftChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getLeftChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int leftChild_offset(Slot s, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return leftChild_offset(s.getFrameIx(), s.getOffset(), frames,
- convertBuffer);
- }
+ static int getLeftChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getLeftChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int leftChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX,
- FRAME_PTR_SIZE, convertBuffer));
+ static int getLeftChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
- }
+ }
- static int leftChild_offset(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx],
- offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
+ static int getLeftChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
- static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
- setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(),
- lc.getOffset(), frames);
- }
+ static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
+ setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(), lc.getOffset(), frames);
+ }
- static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff,
- ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX,
- FRAME_PTR_SIZE, lcFix);
- storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX,
- OFFSET_SIZE, lcOff);
- }
+ static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff, ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, lcFix);
+ storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, lcOff);
+ }
- static int rightChild_fIx(Slot s, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return rightChild_fIx(s.getFrameIx(), s.getOffset(), frames,
- convertBuffer);
- }
+ static int getRightChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getRightChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int rightChild_offset(Slot s, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return rightChild_offset(s.getFrameIx(), s.getOffset(), frames,
- convertBuffer);
- }
+ static int getRightChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getRightChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int rightChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx],
- offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
- }
+ static int getRightChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+ }
- static int rightChild_offset(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset
- + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
+ static int getRightChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
- static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
- setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(),
- rc.getOffset(), frames);
- }
+ static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
+ setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(), rc.getOffset(), frames);
+ }
- static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff,
- ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX,
- FRAME_PTR_SIZE, rcFix);
- storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX,
- OFFSET_SIZE, rcOff);
- }
+ static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff, ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, rcFix);
+ storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, rcOff);
+ }
- static int next_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return next_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
+ static int getNextFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getNextFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int next_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return next_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
+ static int getNextOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getNextOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int next_fIx(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX,
- FRAME_PTR_SIZE, convertBuffer));
- }
+ static int getNextFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+ }
- static int next_offset(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX,
- OFFSET_SIZE, convertBuffer));
- }
+ static int getNextOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
- static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
- setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(),
- node.getOffset(), frames);
- }
+ static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
+ setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(), node.getOffset(), frames);
+ }
- static void setNext(int nodeFix, int nodeOff, int nFix, int nOff,
- ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE,
- nFix);
- storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE,
- nOff);
- }
+ static void setNext(int nodeFix, int nodeOff, int nFix, int nOff, ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, nFix);
+ storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE, nOff);
+ }
- static int prev_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return prev_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
+ static int getPrevFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getPrevFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int prev_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return prev_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
+ static int getPrevOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getPrevOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int prev_fIx(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX,
- FRAME_PTR_SIZE, convertBuffer));
- }
+ static int getPrevFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+ }
- static int prev_offset(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX,
- OFFSET_SIZE, convertBuffer));
- }
+ static int getPrevOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
- static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
- setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(),
- prev.getOffset(), frames);
- }
+ static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
+ setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(), prev.getOffset(), frames);
+ }
- static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff,
- ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE,
- pFix);
- storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE,
- pOff);
- }
+ static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff, ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE, pFix);
+ storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE, pOff);
+ }
- static boolean slotsTheSame(Slot s, Slot t) {
- return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t
- .getOffset()));
- }
+ static boolean slotsTheSame(Slot s, Slot t) {
+ return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t.getOffset()));
+ }
- static void setHeaderFooter(int frameIx, int offset, int usedLength,
- boolean isFree, ByteBuffer[] frames) {
- int slotLength = getActualLength(usedLength);
- int footerOffset = offset + HEADER_SIZE + slotLength;
- storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
- storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
- setFree(frameIx, offset, isFree, frames);
- setFree(frameIx, footerOffset, isFree, frames);
- }
+ static void setHeaderFooter(int frameIx, int offset, int usedLength, boolean isFree, ByteBuffer[] frames) {
+ int slotLength = getActualLength(usedLength);
+ int footerOffset = offset + HEADER_SIZE + slotLength;
+ storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
+ storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
+ setFree(frameIx, offset, isFree, frames);
+ setFree(frameIx, footerOffset, isFree, frames);
+ }
- static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
+ static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
- static int getLength(int frameIx, int offset, ByteBuffer[] frames,
- ByteBuffer convertBuffer) {
- convertBuffer.clear();
- for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
- convertBuffer.put(i, (byte) 0x00);
- }
+ static int getLength(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ convertBuffer.clear();
+ for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
+ convertBuffer.put(i, (byte) 0x00);
+ }
- convertBuffer.put(4 - HEADER_SIZE,
- (byte) ((frames[frameIx].get(offset)) & (MASK)));
- System.arraycopy(frames[frameIx].array(), offset + 1,
- convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
- return convertBuffer.getInt(0);
- }
+ convertBuffer.put(4 - HEADER_SIZE, (byte) ((frames[frameIx].get(offset)) & (MASK)));
+ System.arraycopy(frames[frameIx].array(), offset + 1, convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
+ return convertBuffer.getInt(0);
+ }
- // MSB equal to 1 means FREE
- static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
- return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
- }
+ // MSB equal to 1 means FREE
+ static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
+ return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
+ }
- static void setFree(int frameIx, int offset, boolean free,
- ByteBuffer[] frames) {
- if (free) { // set MSB to 1 (for free)
- frames[frameIx].put(offset,
- (byte) (((frames[frameIx]).array()[offset]) | 0x80));
- } else { // set MSB to 0 (for occupied)
- frames[frameIx].put(offset,
- (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
- }
- }
+ static void setFree(int frameIx, int offset, boolean free, ByteBuffer[] frames) {
+ if (free) { // set MSB to 1 (for free)
+ frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) | 0x80));
+ } else { // set MSB to 0 (for used)
+ frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
+ }
+ }
- static int getActualLength(int l) {
- int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
- return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
- }
+ static int getActualLength(int l) {
+ int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
+ return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
+ }
- private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size,
- ByteBuffer convertBuffer) {
- if ((b.get(fromIndex) & INVALID) == INVALID) {
- return INVALID_INDEX;
- }
+ private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size, ByteBuffer convertBuffer) {
+ if ((b.get(fromIndex) & INVALID) == INVALID) {
+ return INVALID_INDEX;
+ }
- convertBuffer.clear();
- for (int i = 0; i < 4 - size; i++) { // padding
- convertBuffer.put(i, (byte) 0x00);
- }
+ convertBuffer.clear();
+ for (int i = 0; i < 4 - size; i++) { // padding
+ convertBuffer.put(i, (byte) 0x00);
+ }
- System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size,
- size);
- return convertBuffer.getInt(0);
- }
+ System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size, size);
+ return convertBuffer.getInt(0);
+ }
- private static void storeInt(ByteBuffer b, int fromIndex, int size,
- int value) {
- if (value == INVALID_INDEX) {
- b.put(fromIndex, INVALID);
- return;
- }
- for (int i = 0; i < size; i++) {
- b.put(fromIndex + i,
- (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
- }
- }
+ private static void storeInt(ByteBuffer b, int fromIndex, int size, int value) {
+ if (value == INVALID_INDEX) {
+ b.put(fromIndex, INVALID);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ b.put(fromIndex + i, (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
index 1cc15cf..b45c5f9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -15,57 +15,56 @@
public interface IMemoryManager {
- /**
- * Allocates a free slot equal or greater than requested length. Pointer to
- * the allocated slot is put in result, and gets returned to the caller. If
- * no proper free slot is available, result would contain the null/invalid
- * pointer (may vary between different implementations)
- *
- * @param length
- * @param result
- * @throws HyracksDataException
- */
- void allocate(int length, Slot result) throws HyracksDataException;
+ /**
+ * Allocates a free slot equal or greater than requested length. Pointer to
+ * the allocated slot is put in result, and gets returned to the caller. If
+ * no proper free slot is available, result would contain a null/invalid
+ * pointer (may vary between different implementations)
+ *
+ * @param length
+ * @param result
+ * @throws HyracksDataException
+ */
+ void allocate(int length, Slot result) throws HyracksDataException;
- /**
- * Unallocates the specified slot (and returns it back to the free slots
- * set)
- *
- * @param s
- * @return the total length of unallocted slot
- * @throws HyracksDataException
- */
- int unallocate(Slot s) throws HyracksDataException;
+ /**
+ * Unallocates the specified slot (and returns it back to the free slots
+ * set)
+ *
+ * @param s
+ * @return the total length of unallocted slot
+ * @throws HyracksDataException
+ */
+ int unallocate(Slot s) throws HyracksDataException;
- /**
- * @param frameIndex
- * @return the specified frame, from the set of memory buffers, being
- * managed by this memory manager
- */
- ByteBuffer getFrame(int frameIndex);
+ /**
+ * @param frameIndex
+ * @return the specified frame, from the set of memory buffers, being
+ * managed by this memory manager
+ */
+ ByteBuffer getFrame(int frameIndex);
- /**
- * Writes the specified tuple into the specified memory slot (denoted by
- * frameIX and offset)
- *
- * @param frameIx
- * @param offset
- * @param src
- * @param tIndex
- * @return
- */
- boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
- int tIndex);
+ /**
+ * Writes the specified tuple into the specified memory slot (denoted by
+ * frameIx and offset)
+ *
+ * @param frameIx
+ * @param offset
+ * @param src
+ * @param tIndex
+ * @return
+ */
+ boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex);
- /**
- * Reads the specified tuple (denoted by frameIx and offset) and appends it
- * to the passed FrameTupleAppender
- *
- * @param frameIx
- * @param offset
- * @param dest
- * @return
- */
- boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
+ /**
+ * Reads the specified tuple (denoted by frameIx and offset) and appends it
+ * to the passed FrameTupleAppender
+ *
+ * @param frameIx
+ * @param offset
+ * @param dest
+ * @return
+ */
+ boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 1d55bc8..7568e26 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -8,12 +8,12 @@
/**
* @author pouria
*
- * Inteface for the Run Generator
+ * Interface for the Run Generator
*/
public interface IRunGenerator extends IFrameWriter {
- /**
- *
- * @return the list of generated runs, each stored sorted
- */
- public List<IFrameReader> getRuns();
+
+ /**
+ * @return the list of generated (sorted) runs
+ */
+ public List<IFrameReader> getRuns();
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
index 2cc731c..73407f3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -1,53 +1,71 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
/**
- * @author pouria Defines the selection tree, used in sorting with replacement
- * selection to manage the order of outputing tuples into the runs,
- * during the run generation phase. At each point of time, this tree
- * contains tuples, belonging to two different runs: - Current run,
- * being written to the output - Next run
+ * @author pouria
+ *
+ * Defines the selection tree, used in sorting with replacement
+ * selection to manage the order of output tuples into the runs, during
+ * the run generation phase. This tree contains tuples, belonging to two
+ * different runs: - Current run (being written to the output) - Next
+ * run
*/
+
public interface ISelectionTree {
- /**
- * Inserts a new element into the selection.
- *
- * @param element
- * contains the pointer to the memory slot, containing the tuple,
- * along with its run number
- */
- void insert(int[] element);
- /**
- * @return Removes and returns the smallest elemnt in the tree
- */
- int[] getMin();
+ /**
+ * Inserts a new element into the selectionTree
+ *
+ * @param element
+ * contains the pointer to the memory slot, containing the tuple,
+ * along with its run number
+ */
+ void insert(int[] element);
- /**
- * @return Removes and returns the largest elemnt in the tree
- */
- int[] getMax();
+ /**
+ * Removes and returns the smallest element in the tree
+ *
+ * @param result
+ * is the array that will eventually contain minimum entry
+ * pointer
+ */
+ void getMin(int[] result);
- /**
- * @return True of the selection tree does not have any element, and false
- * otherwise.
- */
- boolean isEmpty();
+ /**
+ * Removes and returns the largest element in the tree
+ *
+ * @param result
+ * is the array that will eventually contain maximum entry
+ * pointer
+ */
+ void getMax(int[] result);
- /**
- * Removes all the elements in the tree
- */
- void reset();
+ /**
+ * @return True of the selection tree does not have any element, false
+ * otherwise
+ */
+ boolean isEmpty();
- /**
- *
- * @return Returns, but does NOT remove, the smallest element in the tree
- */
- int[] peekMin();
+ /**
+ * Removes all the elements in the tree
+ */
+ void reset();
- /**
- *
- * @return Returns, but does NOT remove, the largest element in the tree
- */
- int[] peekMax();
+ /**
+ * Returns (and does NOT remove) the smallest element in the tree
+ *
+ * @param result
+ * is the array that will eventually contain minimum entry
+ * pointer
+ */
+ void peekMin(int[] result);
+
+ /**
+ * Returns (and does NOT remove) the largest element in the tree
+ *
+ * @param result
+ * is the array that will eventually contain maximum entry
+ * pointer
+ */
+ void peekMax(int[] result);
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index e617ccb..2f1c94c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -30,7 +30,7 @@
/**
* @author pouria
*
- * Operator descriptor for sorting with replacement. Consists of two
+ * Operator descriptor for sorting with replacement, consisting of two
* phases:
*
* - Run Generation: Denoted by OptimizedSortActivity below, in which
@@ -42,188 +42,163 @@
* - Merging: Denoted by MergeActivity below, in which runs (generated
* in the previous phase) get merged via a merger. Each run has a single
* buffer in memory, and a priority queue is used to select the top
- * tuple each time and send it to the new run/output
+ * tuple each time. Top tuple is send to a new run or output
*/
-public class OptimizedExternalSortOperatorDescriptor extends
- AbstractOperatorDescriptor {
- private static final int NO_LIMIT = -1;
+public class OptimizedExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final int NO_LIMIT = -1;
+ private static final long serialVersionUID = 1L;
+ private static final int SORT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
- private static final int SORT_ACTIVITY_ID = 0;
- private static final int MERGE_ACTIVITY_ID = 1;
+ private final int[] sortFields;
+ private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int memSize;
+ private final int outputLimit;
- private final int[] sortFields;
- private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final int memSize;
- private final int outputLimit;
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+ this(spec, framesLimit, NO_LIMIT, sortFields, null, comparatorFactories, recordDescriptor);
+ }
- public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
- int framesLimit, int[] sortFields,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, NO_LIMIT, sortFields, null,
- comparatorFactories, recordDescriptor);
- }
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int outputLimit,
+ int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+ this(spec, framesLimit, outputLimit, sortFields, null, comparatorFactories, recordDescriptor);
+ }
- public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
- int framesLimit, int outputLimit, int[] sortFields,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, outputLimit, sortFields, null,
- comparatorFactories, recordDescriptor);
- }
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int memSize, int outputLimit,
+ int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ this.memSize = memSize;
+ this.outputLimit = outputLimit;
+ this.sortFields = sortFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ if (memSize <= 1) {
+ throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+ }
+ recordDescriptors[0] = recordDescriptor;
+ }
- public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
- int memSize, int outputLimit, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
- super(spec, 1, 1);
- this.memSize = memSize;
- this.outputLimit = outputLimit;
- this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
- this.comparatorFactories = comparatorFactories;
- if (memSize <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
- }
- recordDescriptors[0] = recordDescriptor;
- }
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+ OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(
- odId, SORT_ACTIVITY_ID));
- OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(
- odId, MERGE_ACTIVITY_ID));
+ builder.addActivity(osa);
+ builder.addSourceEdge(0, osa, 0);
- builder.addActivity(osa);
- builder.addSourceEdge(0, osa, 0);
+ builder.addActivity(oma);
+ builder.addTargetEdge(0, oma, 0);
- builder.addActivity(oma);
- builder.addTargetEdge(0, oma, 0);
+ builder.addBlockingEdge(osa, oma);
+ }
- builder.addBlockingEdge(osa, oma);
- }
+ public static class OptimizedSortTaskState extends AbstractTaskState {
+ private List<IFrameReader> runs;
- public static class OptimizedSortTaskState extends AbstractTaskState {
- private List<IFrameReader> runs;
+ public OptimizedSortTaskState() {
+ }
- public OptimizedSortTaskState() {
- }
+ private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
- private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
- @Override
- public void toBytes(DataOutput out) throws IOException {
+ }
- }
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
- @Override
- public void fromBytes(DataInput in) throws IOException {
+ }
+ }
- }
- }
+ private class OptimizedSortActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
- private class OptimizedSortActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
+ public OptimizedSortActivity(ActivityId id) {
+ super(id);
+ }
- public OptimizedSortActivity(ActivityId id) {
- super(id);
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ final IRunGenerator runGen;
+ if (outputLimit == NO_LIMIT) {
+ runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0], memSize);
+ } else {
+ runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0], memSize, outputLimit);
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
- final IRunGenerator runGen;
- if (outputLimit == NO_LIMIT) {
- runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields,
- firstKeyNormalizerFactory, comparatorFactories,
- recordDescriptors[0], memSize);
- } else {
- runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx,
- sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0], memSize,
- outputLimit);
- }
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
+ runGen.open();
+ }
- runGen.open();
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ runGen.nextFrame(buffer);
+ }
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- runGen.nextFrame(buffer);
- }
+ @Override
+ public void close() throws HyracksDataException {
+ OptimizedSortTaskState state = new OptimizedSortTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
+ runGen.close();
+ state.runs = runGen.getRuns();
+ env.setTaskState(state);
- @Override
- public void close() throws HyracksDataException {
- OptimizedSortTaskState state = new OptimizedSortTaskState(
- ctx.getJobletContext().getJobId(), new TaskId(
- getActivityId(), partition));
- runGen.close();
- state.runs = runGen.getRuns();
- env.setTaskState(state);
+ }
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ runGen.fail();
+ }
+ };
+ return op;
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- runGen.fail();
- }
- };
- return op;
- }
- }
+ private class OptimizedMergeActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
- private class OptimizedMergeActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
+ public OptimizedMergeActivity(ActivityId id) {
+ super(id);
+ }
- public OptimizedMergeActivity(ActivityId id) {
- super(id);
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ OptimizedSortTaskState state = (OptimizedSortTaskState) env.getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), SORT_ACTIVITY_ID), partition));
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx, final IOperatorEnvironment env,
- IRecordDescriptorProvider recordDescProvider,
- final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- OptimizedSortTaskState state = (OptimizedSortTaskState) env
- .getTaskState(new TaskId(new ActivityId(
- getOperatorId(), SORT_ACTIVITY_ID),
- partition));
+ List<IFrameReader> runs = state.runs;
- List<IFrameReader> runs = state.runs;
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
- }
+ OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(ctx, outputLimit, runs,
+ sortFields, comparators, recordDescriptors[0], memSize, writer);
- OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(
- ctx, outputLimit, runs, sortFields, comparators,
- recordDescriptors[0], memSize, writer);
+ merger.process();
- merger.process();
-
- }
- };
- return op;
- }
- }
+ }
+ };
+ return op;
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
index 13ba5c7..1430e6f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -30,284 +30,277 @@
* memory.
*
* The overall process is as follows: - Read the input data frame by
- * frame. For each tuple T in the current frame: - Try to allocate a
- * memory slot for writing T along with the attached header/footer (for
- * memory management purpose) - If T can not be allocated, try to output
- * as many as tuples, currently resident in memory, so that a free slot,
- * large enough to hold T, gets created. MinHeap decides about which
- * tuple should be outputed at each step. Write T into the memory. -
- * Calculate the runID of T (based on the last output tuple for the
+ * frame. For each tuple T in the current frame:
+ *
+ * - Try to allocate a memory slot for writing T along with the attached
+ * header/footer (for memory management purpose)
+ *
+ * - If T can not be allocated, try to output as many tuples, currently
+ * resident in memory, as needed so that a free slot, large enough to
+ * hold T, gets created. MinHeap decides about which tuple should be
+ * sent to the output at each step.
+ *
+ * - Write T into the memory
+ *
+ * - Calculate the runID of T (based on the last output tuple for the
* current run). It is either the current run or the next run. Also
* calculate Poorman's Normalized Key (PNK) for T, to make comparisons
- * faster later. - Create an heap element for T, containing its runID,
- * the slot ptr to its memory location, and its PNK. - Insert the heap
- * element into the heap. - Upon closing, write all the tuples,
- * currently resident in memory, into their corresponding run(s). Again
- * min heap decides about which tuple is the next for output.
+ * faster later.
+ *
+ * - Create a heap element for T, containing: its runID, the slot
+ * pointer to its memory location, and its PNK.
+ *
+ * - Insert the created heap element into the heap
+ *
+ * - Upon closing, write all the tuples, currently resident in memory,
+ * into their corresponding run(s). Again min heap decides about which
+ * tuple is the next for output.
*
* OptimizedSortOperatorDescriptor will merge the generated runs, to
- * generate the final sorted output opf the data.
+ * generate the final sorted output of the data.
*/
public class OptimizedExternalSortRunGenerator implements IRunGenerator {
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- IBinaryComparatorFactory[] comparatorFactories;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final List<IFrameReader> runs;
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final List<IFrameReader> runs;
- private ISelectionTree sTree;
- private IMemoryManager memMgr;
+ private ISelectionTree sTree;
+ private IMemoryManager memMgr;
- private final int memSize;
- private FrameTupleAccessor inputAccessor; // Used to read tuples in
- // nextFrame()
- private FrameTupleAppender outputAppender; // Used to write tuple to the
- // dedicated output buffer
- private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
- // into run(s)
- private FrameTupleAccessor lastRecordAccessor; // Used to read last output
- // record from the output
- // buffer
- private int curRunSize;
- private int lastTupleIx; // Holds index of last output tuple in the
- // dedicated output buffer
- private Slot allocationPtr; // Contains the ptr to the allocated memory slot
- // by the memory manager for the new tuple
- private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
- // the selectionTree to output
- private int[] sTreeTop;
+ private final int memSize;
+ private FrameTupleAccessor inputAccessor; // Used to read tuples in
+ // nextFrame()
+ private FrameTupleAppender outputAppender; // Used to write tuple to the
+ // dedicated output buffer
+ private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+ // into run(s)
+ private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+ // record from the output
+ // buffer
+ private int curRunSize;
+ private int lastTupleIx; // Holds index of last output tuple in the
+ // dedicated output buffer
+ private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+ // by the memory manager for the new tuple
+ private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+ // the selectionTree to output
+ private int[] sTreeTop;
- RunFileWriter writer;
+ RunFileWriter writer;
- private boolean newRun;
- private int curRunId;
+ private boolean newRun;
+ private int curRunId;
- public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx,
- int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, int memSize) {
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- this.comparatorFactories = comparatorFactories;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- this.runs = new LinkedList<IFrameReader>();
- this.memSize = memSize;
- }
+ public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int memSize) {
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparatorFactories = comparatorFactories;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ this.runs = new LinkedList<IFrameReader>();
+ this.memSize = memSize;
+ }
- @Override
- public void open() throws HyracksDataException {
- runs.clear();
- inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptor);
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputBuffer = ctx.allocateFrame();
- outputAppender.reset(outputBuffer, true);
- lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptor);
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputBuffer = ctx.allocateFrame();
+ outputAppender.reset(outputBuffer, true);
+ lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = new BSTMemMgr(ctx, memSize);
- this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories,
- recordDescriptor, memMgr);
- this.allocationPtr = new Slot();
- this.outputedTuple = new Slot();
- this.sTreeTop = new int[] { -1, -1, -1, -1 };
- curRunId = -1;
- openNewRun();
- }
+ this.memMgr = new BSTMemMgr(ctx, memSize);
+ this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
+ this.allocationPtr = new Slot();
+ this.outputedTuple = new Slot();
+ this.sTreeTop = new int[] { -1, -1, -1, -1 };
+ curRunId = -1;
+ openNewRun();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputAccessor.reset(buffer);
- byte[] bufferArray = buffer.array();
- int tupleCount = inputAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- allocationPtr.clear();
- int tLength = inputAccessor.getTupleEndOffset(i)
- - inputAccessor.getTupleStartOffset(i);
- memMgr.allocate(tLength, allocationPtr);
- while (allocationPtr.isNull()) {
- int unAllocSize = -1;
- while (unAllocSize < tLength) {
- unAllocSize = outputRecord();
- if (unAllocSize < 1) {
- throw new HyracksDataException(
- "Unable to allocate space for the new tuple, while there is no more tuple to output");
- }
- }
- memMgr.allocate(tLength, allocationPtr);
- ((BSTMemMgr) memMgr)._debug_decLookupCount();
- }
- memMgr.writeTuple(allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), inputAccessor, i);
- int runId = getRunId(inputAccessor, i);
- int pnk = getPNK(inputAccessor, i, bufferArray);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputAccessor.reset(buffer);
+ byte[] bufferArray = buffer.array();
+ int tupleCount = inputAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; ++i) {
+ allocationPtr.clear();
+ int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
+ memMgr.allocate(tLength, allocationPtr);
+ while (allocationPtr.isNull()) {
+ int unAllocSize = -1;
+ while (unAllocSize < tLength) {
+ unAllocSize = outputRecord();
+ if (unAllocSize < 1) {
+ throw new HyracksDataException(
+ "Unable to allocate space for the new tuple, while there is no more tuple to output");
+ }
+ }
+ memMgr.allocate(tLength, allocationPtr);
+ ((BSTMemMgr) memMgr).debugDecLookupCount();
+ }
+ memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+ int runId = getRunId(inputAccessor, i);
+ int pnk = getPNK(inputAccessor, i, bufferArray);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ }
- @Override
- public void close() throws HyracksDataException {
- while (!sTree.isEmpty()) { // Outputting remaining elements in the
- // selectionTree
- outputRecord();
- }
- if (outputAppender.getTupleCount() > 0) { // Writing out very last
- // resident records to file
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- }
+ @Override
+ public void close() throws HyracksDataException {
+ while (!sTree.isEmpty()) { // Outputting remaining elements in the
+ // selectionTree
+ outputRecord();
+ }
+ if (outputAppender.getTupleCount() > 0) { // Writing out very last
+ // resident records to file
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
- public List<IFrameReader> getRuns() {
- return runs;
- }
+ public List<IFrameReader> getRuns() {
+ return runs;
+ }
- private int outputRecord() throws HyracksDataException {
- outputedTuple.clear();
- sTreeTop = sTree.getMin();
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException(
- "Invalid outputed tuple (Top of the selection tree is invalid)");
- }
+ private int outputRecord() throws HyracksDataException {
+ outputedTuple.clear();
+ sTree.getMin(sTreeTop);
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
- if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
- // runs
- openNewRun();
- }
+ if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
+ // runs
+ openNewRun();
+ }
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException(
- "Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- } else {
- lastTupleIx++;
- }
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- curRunSize++;
- return memMgr.unallocate(outputedTuple);
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException("Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ } else {
+ lastTupleIx++;
+ }
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ curRunSize++;
+ return memMgr.unallocate(outputedTuple);
- }
+ }
- private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
- // buffInArray
- // out
- // for
- // better
- // performance
- // (not
- // converting
- // for
- // each
- // and
- // every
- // tuple)
- int sfIdx = sortFields[0];
- int tStart = fta.getTupleStartOffset(tIx);
- int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
- int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
- int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
- return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
- - f0StartRel));
- }
+ private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+ // buffInArray
+ // out
+ // for
+ // better
+ // performance
+ // (not
+ // converting
+ // for
+ // each
+ // and
+ // every
+ // tuple)
+ int sfIdx = sortFields[0];
+ int tStart = fta.getTupleStartOffset(tIx);
+ int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+ int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+ int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+ return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
+ }
- private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
- // current
- // record to
- // last output
- // record, it
- // decided about
- // current
- // record's
- // runId
- if (newRun) { // Very first record for a new run
- return curRunId;
- }
+ private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+ // current
+ // record to
+ // last output
+ // record, it
+ // decides about
+ // current
+ // record's
+ // runId
+ if (newRun) { // Very first record for a new run
+ return curRunId;
+ }
- byte[] lastRecBuff = outputBuffer.array();
- lastRecordAccessor.reset(outputBuffer);
- int lastStartOffset = lastRecordAccessor
- .getTupleStartOffset(lastTupleIx);
+ byte[] lastRecBuff = outputBuffer.array();
+ lastRecordAccessor.reset(outputBuffer);
+ int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
- ByteBuffer fr2 = fta.getBuffer();
- byte[] curRecBuff = fr2.array();
- int r2StartOffset = fta.getTupleStartOffset(tupIx);
+ ByteBuffer fr2 = fta.getBuffer();
+ byte[] curRecBuff = fr2.array();
+ int r2StartOffset = fta.getTupleStartOffset(tupIx);
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
- + (fIdx - 1) * 4);
- int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
- int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
- + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
- * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
- l2);
- if (c != 0) {
- if (c <= 0) {
- return curRunId;
- } else {
- return (curRunId + 1);
- }
- }
- }
- return curRunId;
- }
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
+ int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+ int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
+ if (c != 0) {
+ if (c <= 0) {
+ return curRunId;
+ } else {
+ return (curRunId + 1);
+ }
+ }
+ }
+ return curRunId;
+ }
- private void openNewRun() throws HyracksDataException {
- if (writer != null) { // There is a prev run, so flush its tuples and
- // close it first
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- }
+ private void openNewRun() throws HyracksDataException {
+ if (writer != null) { // There is a prev run, so flush its tuples and
+ // close it first
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortRunGenerator.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- curRunId++;
- newRun = true;
- curRunSize = 0;
- lastTupleIx = -1;
- }
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ curRunId++;
+ newRun = true;
+ curRunSize = 0;
+ lastTupleIx = -1;
+ }
- private boolean isEntryValid(int[] entry) {
- return ((entry[SortMinHeap.RUN_ID_IX] > -1)
- && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
- }
+ private boolean isEntryValid(int[] entry) {
+ return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
index df40577..f69f676 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -24,37 +24,51 @@
*
* This class implements the run generator for sorting with replacement
* selection, where there is a limit on the output, i.e. we are looking
- * for top-k tuples (first k smallest tuples w.r.t sorting keys), if
- * data were sorted, where k is a specific limit, decided in advance.
+ * for top-k tuples (first k smallest tuples w.r.t sorting keys).
*
* A SortMinMaxHeap is used as the selectionTree to decide the order of
* writing tuples into the runs, and also to prune tuples (if possible).
- * Memory manager is based on a binary search tree to allocate tuples.
+ * Memory manager is based on a binary search tree and is used to
+ * allocate memory slots for tuples.
*
- * The overall process is as follows (Assuming that the limit is K): -
- * Read the input data frame by frame. For each tuple T in the current
- * frame: - If currentRun R has reached the limit of K on the size, and
- * (T > maximum tuple of R), then ignore T. - Otherwise, try to allocate
- * a memory slot for writing T along with the attached header/footer
- * (for memory management purpose) - If T can not be allocated, try to
- * output as many as tuples, currently resident in memory, so that a
- * free slot, large enough to hold T, gets created. MinMaxHeap decides
- * about which tuple should be outputed at each step. Write T into the
- * memory. - Calculate the runID of T (based on the last output tuple
- * for the current run). It is either the current run or the next run.
- * Also calculate Poorman's Normalized Key (PNK) for T, to make
- * comparisons faster later. - Create an heap element for T, containing
- * its runID, the slot ptr to its memory location, and its PNK. - If
- * runID is the nextRun, insert the heap element into the heap, and
- * increment the size of nextRun in the stats. - If runID is the
- * currentRun: - If currentRun has not hit the limit of k, insert the
- * element into the heap, and increase currentRun size in the stats. -
- * (arriving here, currentRun has hit the limit of K, while T is less
- * than the max). Discard the current max for the current run (by
- * pop-ing it from the heap and unallocating its memory location) and
- * insert the heap element into the heap. No need to change the
- * currentRun size as we are replacing an old element (the old max) with
- * T.
+ * The overall process is as follows (Assuming that the limit is K):
+ *
+ * - Read the input data frame by frame. For each tuple T in the current
+ * frame:
+ *
+ * - If currentRun R has reached the limit of K on the size, and (T >
+ * maximum tuple of R), then ignore T.
+ *
+ * - Otherwise, try to allocate a memory slot for writing T along with
+ * the attached header/footer (for memory management purpose)
+ *
+ * - If T can not be allocated, try to output as many tuples, currently
+ * resident in memory, as needed so that a free slot, large enough to
+ * hold T, gets created. MinMaxHeap decides about which tuple should be
+ * sent to the output at each step.
+ *
+ * - Write T into memory.
+ *
+ * - Calculate the runID of T (based on the last output tuple for the
+ * current run). It is either the current run or the next run. Also
+ * calculate Poorman's Normalized Key (PNK) for T, to make comparisons
+ * faster later.
+ *
+ * - Create an heap element for T, containing its runID, the slot ptr to
+ * its memory location, and its PNK.
+ *
+ * - If runID is the nextRun, insert the heap element into the heap, and
+ * increment the size of nextRun.
+ *
+ * - If runID is the currentRun, then:
+ *
+ * - If currentRun has not hit the limit of k, insert the element into
+ * the heap, and increase currentRun size. - Otherwise, currentRun has
+ * hit the limit of K, while T is less than the max. So discard the
+ * current max for the current run (by poping it from the heap and
+ * unallocating its memory location) and insert the heap element into
+ * the heap. No need to change the currentRun size as we are replacing
+ * an old element (the old max) with T.
*
* - Upon closing, write all the tuples, currently resident in memory,
* into their corresponding run(s).
@@ -64,412 +78,378 @@
* resident in memory) get discarded at the beginning. MinMax heap can
* be used to find these tuples.
*/
-public class OptimizedExternalSortRunGeneratorWithLimit implements
- IRunGenerator {
+public class OptimizedExternalSortRunGeneratorWithLimit implements IRunGenerator {
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- IBinaryComparatorFactory[] comparatorFactories;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final List<IFrameReader> runs;
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final List<IFrameReader> runs;
- private ISelectionTree sTree;
- private IMemoryManager memMgr;
+ private ISelectionTree sTree;
+ private IMemoryManager memMgr;
- private final int memSize;
- private FrameTupleAccessor inputAccessor; // Used to read tuples in
- // nextFrame()
- private FrameTupleAppender outputAppender; // Used to write tuple to the
- // dedicated output buffer
- private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
- // into run(s)
- private FrameTupleAccessor lastRecordAccessor; // Used to read last output
- // record from the output
- // buffer
- private FrameTupleAccessor fta2; // Used to read max record
- private final int outputLimit;
- private int curRunSize;
- private int nextRunSize;
- private int lastTupleIx; // Holds index of last output tuple in the
- // dedicated output buffer
- private Slot allocationPtr; // Contains the ptr to the allocated memory slot
- // by the memory manager for the new tuple
- private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
- // the selectionTree to output
- private Slot discard;
- private int[] sTreeTop;
- private int[] peek;
- RunFileWriter writer;
- private boolean newRun;
- private int curRunId;
+ private final int memSize;
+ private FrameTupleAccessor inputAccessor; // Used to read tuples in
+ // nextFrame()
+ private FrameTupleAppender outputAppender; // Used to write tuple to the
+ // dedicated output buffer
+ private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+ // into run(s)
+ private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+ // record from the output
+ // buffer
+ private FrameTupleAccessor fta2; // Used to read max record
+ private final int outputLimit;
+ private int curRunSize;
+ private int nextRunSize;
+ private int lastTupleIx; // Holds index of last output tuple in the
+ // dedicated output buffer
+ private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+ // by the memory manager for the new tuple
+ private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+ // the selectionTree to output
+ private Slot discard;
+ private int[] sTreeTop;
+ private int[] peek;
+ RunFileWriter writer;
+ private boolean newRun;
+ private int curRunId;
- public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx,
- int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, int memSize, int limit) {
+ public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int memSize, int limit) {
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- this.comparatorFactories = comparatorFactories;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- this.runs = new LinkedList<IFrameReader>();
- this.memSize = memSize;
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparatorFactories = comparatorFactories;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ this.runs = new LinkedList<IFrameReader>();
+ this.memSize = memSize;
- this.outputLimit = limit;
- }
+ this.outputLimit = limit;
+ }
- @Override
- public void open() throws HyracksDataException {
- runs.clear();
- inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptor);
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputBuffer = ctx.allocateFrame();
- outputAppender.reset(outputBuffer, true);
- lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = new BSTMemMgr(ctx, memSize);
- this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories,
- recordDescriptor, memMgr);
- this.allocationPtr = new Slot();
- this.outputedTuple = new Slot();
- this.sTreeTop = new int[] { -1, -1, -1, -1 };
- this.peek = new int[] { -1, -1, -1, -1 };
- this.discard = new Slot();
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputBuffer = ctx.allocateFrame();
+ outputAppender.reset(outputBuffer, true);
+ lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = new BSTMemMgr(ctx, memSize);
+ this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
+ this.allocationPtr = new Slot();
+ this.outputedTuple = new Slot();
+ this.sTreeTop = new int[] { -1, -1, -1, -1 };
+ this.peek = new int[] { -1, -1, -1, -1 };
+ this.discard = new Slot();
- curRunId = -1;
- curRunSize = 0;
- nextRunSize = 0;
- openNewRun();
- }
+ curRunId = -1;
+ curRunSize = 0;
+ nextRunSize = 0;
+ openNewRun();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputAccessor.reset(buffer);
- byte[] bufferArray = buffer.array();
- int tupleCount = inputAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- if (curRunSize >= outputLimit) {
- peek = sTree.peekMax();
- if (isEntryValid(peek)
- && compareRecords(inputAccessor, i,
- peek[SortMinMaxHeap.FRAME_IX],
- peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
- continue;
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputAccessor.reset(buffer);
+ byte[] bufferArray = buffer.array();
+ int tupleCount = inputAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ if (curRunSize >= outputLimit) {
+ sTree.peekMax(peek);
+ if (isEntryValid(peek)
+ && compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
+ peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
+ continue;
+ }
+ }
- allocationPtr.clear();
- int tLength = inputAccessor.getTupleEndOffset(i)
- - inputAccessor.getTupleStartOffset(i);
- memMgr.allocate(tLength, allocationPtr);
- while (allocationPtr.isNull()) {
- int unAllocSize = -1;
- while (unAllocSize < tLength) {
- unAllocSize = outputRecord();
- if (unAllocSize < 1) {
- throw new HyracksDataException(
- "Unable to allocate space for the new tuple, while there is no more tuple to output");
- }
- }
- memMgr.allocate(tLength, allocationPtr);
- }
+ allocationPtr.clear();
+ int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
+ memMgr.allocate(tLength, allocationPtr);
+ while (allocationPtr.isNull()) {
+ int unAllocSize = -1;
+ while (unAllocSize < tLength) {
+ unAllocSize = outputRecord();
+ if (unAllocSize < 1) {
+ throw new HyracksDataException(
+ "Unable to allocate space for the new tuple, while there is no more tuple to output");
+ }
+ }
+ memMgr.allocate(tLength, allocationPtr);
+ }
- int pnk = getPNK(inputAccessor, i, bufferArray);
- int runId = getRunId(inputAccessor, i);
- if (runId != curRunId) { // tuple belongs to the next run
- memMgr.writeTuple(allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- nextRunSize++;
- continue;
- }
- // belongs to the current run
- if (curRunSize < outputLimit) {
- memMgr.writeTuple(allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- curRunSize++;
- continue;
- }
+ int pnk = getPNK(inputAccessor, i, bufferArray);
+ int runId = getRunId(inputAccessor, i);
+ if (runId != curRunId) { // tuple belongs to the next run
+ memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ nextRunSize++;
+ continue;
+ }
+ // belongs to the current run
+ if (curRunSize < outputLimit) {
+ memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ curRunSize++;
+ continue;
+ }
- peek = sTree.peekMax();
- if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
- peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
- continue;
- }
- // replacing the max
- peek = sTree.getMax();
- discard.set(peek[SortMinMaxHeap.FRAME_IX],
- peek[SortMinMaxHeap.OFFSET_IX]);
- memMgr.unallocate(discard);
- memMgr.writeTuple(allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
- allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- }
- }
+ sTree.peekMax(peek);
+ if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
+ continue;
+ }
+ // replacing the max
+ sTree.getMax(peek);
+ discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
+ memMgr.unallocate(discard);
+ memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ }
- @Override
- public void close() throws HyracksDataException {
- while (!sTree.isEmpty()) { // Outputting remaining elements in the
- // selectionTree
- outputRecordForClose();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ while (!sTree.isEmpty()) { // Outputting remaining elements in the
+ // selectionTree
+ outputRecordForClose();
+ }
- if (outputAppender.getTupleCount() > 0) { // Writing out very last
- // resident records to file
- FrameUtils.flushFrame(outputBuffer, writer);
- }
+ if (outputAppender.getTupleCount() > 0) { // Writing out very last
+ // resident records to file
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
- writer.close();
- runs.add(writer.createReader());
- }
+ writer.close();
+ runs.add(writer.createReader());
+ }
- public List<IFrameReader> getRuns() {
- return runs;
- }
+ public List<IFrameReader> getRuns() {
+ return runs;
+ }
- private int outputRecord() throws HyracksDataException {
- outputedTuple.clear();
- sTreeTop = sTree.getMin();
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException(
- "Invalid outputed tuple (Top of the selection tree is invalid)");
- }
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
- // not
- // append
- // to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException(
- "Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- } else {
- lastTupleIx++;
- }
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- return memMgr.unallocate(outputedTuple);
- }
- // Minimum belongs to the next Run
- openNewRun();
- int popCount = curRunSize - outputLimit;
- int l = 0;
- int maxFreedSpace = 0;
- for (int p = 0; p < popCount; p++) {
- peek = sTree.getMax();
- if (!isEntryValid(peek)) {
- throw new HyracksDataException(
- "Invalid Maximum extracted from MinMaxHeap");
- }
- discard.set(peek[SortMinMaxHeap.FRAME_IX],
- peek[SortMinMaxHeap.OFFSET_IX]);
- l = memMgr.unallocate(discard);
- if (l > maxFreedSpace) {
- maxFreedSpace = l;
- }
- curRunSize--;
- }
+ private int outputRecord() throws HyracksDataException {
+ outputedTuple.clear();
+ sTree.getMin(sTreeTop);
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
+ // not
+ // append
+ // to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException("Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ } else {
+ lastTupleIx++;
+ }
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ return memMgr.unallocate(outputedTuple);
+ }
+ // Minimum belongs to the next Run
+ openNewRun();
+ int popCount = curRunSize - outputLimit;
+ int l = 0;
+ int maxFreedSpace = 0;
+ for (int p = 0; p < popCount; p++) {
+ sTree.getMax(peek);
+ if (!isEntryValid(peek)) {
+ throw new HyracksDataException("Invalid Maximum extracted from MinMaxHeap");
+ }
+ discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
+ l = memMgr.unallocate(discard);
+ if (l > maxFreedSpace) {
+ maxFreedSpace = l;
+ }
+ curRunSize--;
+ }
- if (maxFreedSpace != 0) {
- return maxFreedSpace;
- }
- // No max discarded (We just flushed out the prev run, so the output
- // buffer should be clear
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- throw new HyracksDataException(
- "Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- return memMgr.unallocate(outputedTuple);
- }
+ if (maxFreedSpace != 0) {
+ return maxFreedSpace;
+ }
+ // No max discarded (We just flushed out the prev run, so the output
+ // buffer should be clear)
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ throw new HyracksDataException("Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ return memMgr.unallocate(outputedTuple);
+ }
- private void outputRecordForClose() throws HyracksDataException {
- sTreeTop = sTree.getMin();
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException(
- "Invalid outputed tuple (Top of the selection tree is invalid)");
- }
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
- openNewRun();
- }
+ private void outputRecordForClose() throws HyracksDataException {
+ sTree.getMin(sTreeTop);
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
+ openNewRun();
+ }
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException(
- "Can not append to the ouput buffer in sort");
- }
- }
- }
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException("Can not append to the ouput buffer in sort");
+ }
+ }
+ }
- private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
- // buffInArray
- // out
- // for
- // better
- // performance
- // (not
- // converting
- // for
- // each
- // and
- // every
- // tuple)
- int sfIdx = sortFields[0];
- int tStart = fta.getTupleStartOffset(tIx);
- int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
- int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
- int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
- return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
- - f0StartRel));
- }
+ private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+ // buffInArray
+ // out
+ // for
+ // better
+ // performance
+ // (not
+ // converting
+ // for
+ // each
+ // and
+ // every
+ // tuple)
+ int sfIdx = sortFields[0];
+ int tStart = fta.getTupleStartOffset(tIx);
+ int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+ int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+ int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+ return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
+ }
- private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
- // current
- // record to
- // last output
- // record, it
- // decided about
- // current
- // record's
- // runId
- if (newRun) { // Very first record for a new run
- return curRunId;
- }
+ private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+ // current
+ // record to
+ // last output
+ // record, it
+ // decides about
+ // current
+ // record's
+ // runId
+ if (newRun) { // Very first record for a new run
+ return curRunId;
+ }
- byte[] lastRecBuff = outputBuffer.array();
- lastRecordAccessor.reset(outputBuffer);
- int lastStartOffset = lastRecordAccessor
- .getTupleStartOffset(lastTupleIx);
+ byte[] lastRecBuff = outputBuffer.array();
+ lastRecordAccessor.reset(outputBuffer);
+ int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
- ByteBuffer fr2 = fta.getBuffer();
- byte[] curRecBuff = fr2.array();
- int r2StartOffset = fta.getTupleStartOffset(tupIx);
+ ByteBuffer fr2 = fta.getBuffer();
+ byte[] curRecBuff = fr2.array();
+ int r2StartOffset = fta.getTupleStartOffset(tupIx);
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
- + (fIdx - 1) * 4);
- int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
- int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
- + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
- * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
- l2);
- if (c != 0) {
- if (c <= 0) {
- return curRunId;
- } else {
- return (curRunId + 1);
- }
- }
- }
- return curRunId;
- }
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
+ int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+ int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
+ if (c != 0) {
+ if (c <= 0) {
+ return curRunId;
+ } else {
+ return (curRunId + 1);
+ }
+ }
+ }
+ return curRunId;
+ }
- // first<sec : -1
- private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2,
- int offset2) {
- ByteBuffer buff1 = fta1.getBuffer();
- byte[] recBuff1 = buff1.array();
- int offset1 = fta1.getTupleStartOffset(ix1);
+ // first<sec : -1
+ private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2, int offset2) {
+ ByteBuffer buff1 = fta1.getBuffer();
+ byte[] recBuff1 = buff1.array();
+ int offset1 = fta1.getTupleStartOffset(ix1);
- offset2 += BSTNodeUtil.HEADER_SIZE;
- ByteBuffer buff2 = memMgr.getFrame(fix2);
- fta2.reset(buff2);
- byte[] recBuff2 = buff2.array();
+ offset2 += BSTNodeUtil.HEADER_SIZE;
+ ByteBuffer buff2 = memMgr.getFrame(fix2);
+ fta2.reset(buff2);
+ byte[] recBuff2 = buff2.array();
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : buff1
- .getInt(offset1 + (fIdx - 1) * 4);
- int f1End = buff1.getInt(offset1 + fIdx * 4);
- int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : buff2
- .getInt(offset2 + (fIdx - 1) * 4);
- int f2End = buff2.getInt(offset2 + fIdx * 4);
- int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : buff1.getInt(offset1 + (fIdx - 1) * 4);
+ int f1End = buff1.getInt(offset1 + fIdx * 4);
+ int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : buff2.getInt(offset2 + (fIdx - 1) * 4);
+ int f2End = buff2.getInt(offset2 + fIdx * 4);
+ int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
- }
+ }
- private void openNewRun() throws HyracksDataException {
- if (writer != null) { // There is a prev run, so flush its tuples and
- // close it first
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- }
+ private void openNewRun() throws HyracksDataException {
+ if (writer != null) { // There is a prev run, so flush its tuples and
+ // close it first
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortRunGenerator.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- curRunId++;
- newRun = true;
- curRunSize = nextRunSize;
- nextRunSize = 0;
- lastTupleIx = -1;
- }
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ curRunId++;
+ newRun = true;
+ curRunSize = nextRunSize;
+ nextRunSize = 0;
+ lastTupleIx = -1;
+ }
- private boolean isEntryValid(int[] entry) {
- return ((entry[SortMinHeap.RUN_ID_IX] > -1)
- && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
- }
+ private boolean isEntryValid(int[] entry) {
+ return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
index 67f3498..53da739 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
@@ -45,189 +45,184 @@
* find the top tuple at each iteration, among all the runs' heads in
* memory (check RunMergingFrameReader for more details). Otherwise,
* assuming that we have R runs and M memory buffers, where (R > M), we
- * first merge first (M-1) runs and create a new sorted run. Discarding
- * the first (M-1) runs, now merging step gets applied recursively on
- * the (R-M+2) remaining runs using the M memory buffers.
+ * first merge first (M-1) runs and create a new sorted run, out of
+ * them. Discarding the first (M-1) runs, now merging procedure gets
+ * applied recursively on the (R-M+2) remaining runs using the M memory
+ * buffers.
*
- * Merging also takes the outputLimit L, if specified, into account
- * during merging. Once the final pass is done on the runs (which is the
- * pass that generates the final sorted output), as soon as the output
- * hits the limit L, the process stops, closes, and returns.
+ * Merging also takes the outputLimit, if specified, into account during
+ * merging. Once the final pass is done on the runs (which is the pass
+ * that generates the final sorted output), as soon as the output size
+ * hits the output limit, the process stops, closes, and returns.
*/
public class OptimizedExternalSortRunMerger {
- private final IHyracksTaskContext ctx;
- private final List<IFrameReader> runs;
- private final int[] sortFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDesc;
- private final int framesLimit;
- private final IFrameWriter writer;
- private List<ByteBuffer> inFrames;
- private ByteBuffer outFrame;
- private FrameTupleAppender outFrameAppender;
- private FrameTupleAccessor outFrameAccessor;
- private final int outputLimit;
- private int currentSize;
+ private final IHyracksTaskContext ctx;
+ private final List<IFrameReader> runs;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final int framesLimit;
+ private final IFrameWriter writer;
+ private List<ByteBuffer> inFrames;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+ private FrameTupleAccessor outFrameAccessor;
+ private final int outputLimit;
+ private int currentSize;
- public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx,
- int outputLimit, List<IFrameReader> runs, int[] sortFields,
- IBinaryComparator[] comparators, RecordDescriptor recordDesc,
- int framesLimit, IFrameWriter writer) {
- this.ctx = ctx;
- this.runs = new LinkedList<IFrameReader>(runs);
- this.sortFields = sortFields;
- this.comparators = comparators;
- this.recordDesc = recordDesc;
- this.framesLimit = framesLimit;
- this.writer = writer;
- this.outputLimit = outputLimit;
- this.currentSize = 0;
- }
+ public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs,
+ int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
+ IFrameWriter writer) {
+ this.ctx = ctx;
+ this.runs = new LinkedList<IFrameReader>(runs);
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ this.outputLimit = outputLimit;
+ this.currentSize = 0;
+ }
- public void process() throws HyracksDataException {
- writer.open();
+ public void process() throws HyracksDataException {
+ writer.open();
- try {
- outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDesc);
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
+ try {
+ outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
- if (runs.size() == 1) {
+ if (runs.size() == 1) {
- if (outputLimit < 1) {
- runs.get(0).open();
- ByteBuffer nextFrame = ctx.allocateFrame();
- while (runs.get(0).nextFrame(nextFrame)) {
- FrameUtils.flushFrame(nextFrame, writer);
- outFrameAppender.reset(nextFrame, true);
- }
- return;
- }
+ if (outputLimit < 1) {
+ runs.get(0).open();
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (runs.get(0).nextFrame(nextFrame)) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ outFrameAppender.reset(nextFrame, true);
+ }
+ return;
+ }
- int totalCount = 0;
- runs.get(0).open();
- FrameTupleAccessor fta = new FrameTupleAccessor(
- ctx.getFrameSize(), recordDesc);
- ByteBuffer nextFrame = ctx.allocateFrame();
- while (totalCount <= outputLimit
- && runs.get(0).nextFrame(nextFrame)) {
- fta.reset(nextFrame);
- int tupCount = fta.getTupleCount();
- if ((totalCount + tupCount) < outputLimit) {
- FrameUtils.flushFrame(nextFrame, writer);
- totalCount += tupCount;
- continue;
- }
- // The very last buffer, which exceeds the limit
- int copyCount = outputLimit - totalCount;
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < copyCount; i++) {
- if (!outFrameAppender.append(fta, i)) {
- throw new IllegalStateException();
- }
- totalCount++;
- }
- }
+ int totalCount = 0;
+ runs.get(0).open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
+ fta.reset(nextFrame);
+ int tupCount = fta.getTupleCount();
+ if ((totalCount + tupCount) < outputLimit) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ totalCount += tupCount;
+ continue;
+ }
+ // The very last buffer, which exceeds the limit
+ int copyCount = outputLimit - totalCount;
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < copyCount; i++) {
+ if (!outFrameAppender.append(fta, i)) {
+ throw new IllegalStateException();
+ }
+ totalCount++;
+ }
+ }
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
- return;
- }
+ return;
+ }
- // More than one run, actual merging is needed
- inFrames = new ArrayList<ByteBuffer>();
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.allocateFrame());
- }
- while (runs.size() > 0) {
- try {
- doPass(runs);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ // More than one run, actual merging is needed
+ inFrames = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
- // creates a new run from runs that can fit in memory.
- private void doPass(List<IFrameReader> runs) throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
- .getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- runCursors[i] = runs.get(i);
- }
- RunMergingFrameReader merger = new RunMergingFrameReader(ctx,
- runCursors, inFrames, sortFields, comparators, recordDesc);
- merger.open();
- try {
- while (merger.nextFrame(outFrame)) {
- if (outputLimit > 0 && finalPass) {
- outFrameAccessor.reset(outFrame);
- int count = outFrameAccessor.getTupleCount();
- if ((currentSize + count) > outputLimit) {
- ByteBuffer b = ctx.allocateFrame();
- FrameTupleAppender partialAppender = new FrameTupleAppender(
- ctx.getFrameSize());
- partialAppender.reset(b, true);
- int copyCount = outputLimit - currentSize;
- for (int i = 0; i < copyCount; i++) {
- partialAppender.append(outFrameAccessor, i);
- currentSize++;
- }
- FrameUtils.makeReadable(b);
- FrameUtils.flushFrame(b, writer);
- break;
- } else {
- FrameUtils.flushFrame(outFrame, writer);
- currentSize += count;
- }
- } else {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
- } finally {
- merger.close();
- }
+ // creates a new run from runs that can fit in memory.
+ private void doPass(List<IFrameReader> runs) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+ if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ runCursors[i] = runs.get(i);
+ }
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+ comparators, recordDesc);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
+ if (outputLimit > 0 && finalPass) {
+ outFrameAccessor.reset(outFrame);
+ int count = outFrameAccessor.getTupleCount();
+ if ((currentSize + count) > outputLimit) {
+ ByteBuffer b = ctx.allocateFrame();
+ FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
+ partialAppender.reset(b, true);
+ int copyCount = outputLimit - currentSize;
+ for (int i = 0; i < copyCount; i++) {
+ partialAppender.append(outFrameAccessor, i);
+ currentSize++;
+ }
+ FrameUtils.makeReadable(b);
+ FrameUtils.flushFrame(b, writer);
+ break;
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ currentSize += count;
+ }
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+ } finally {
+ merger.close();
+ }
- if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
- runs.clear();
- return;
- }
+ if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+ runs.clear();
+ return;
+ }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index e791582..c014ed1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -31,23 +31,14 @@
public class RunMergingFrameReader implements IFrameReader {
private final IHyracksTaskContext ctx;
-
private final IFrameReader[] runCursors;
-
private final List<ByteBuffer> inFrames;
-
private final int[] sortFields;
-
private final IBinaryComparator[] comparators;
-
private final RecordDescriptor recordDesc;
-
private final FrameTupleAppender outFrameAppender;
-
private ReferencedPriorityQueue topTuples;
-
private int[] tupleIndexes;
-
private FrameTupleAccessor[] tupleAccessors;
public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
index 85bcba2..2ceea17 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -1,65 +1,70 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
/**
- * @author pouria Defines a slot in the memory, which can be a free or allocated
+ * @author pouria
+ *
+ * Defines a slot in the memory, which can be a free or used (allocated)
* slot. Memory is a set of frames, ordered as a list. Each tuple is
* stored in a slot, where the location of the slot is denoted by a pair
- * of integers: - The index of the frame, in the lits of frames in
- * memory - The starting offset of the slot, within the the specific
- * frame
+ * of integers:
+ *
+ * - The index of the frame, in the list of frames in memory. (referred
+ * to as frameIx)
+ *
+ * - The starting offset of the slot, within that specific frame.
+ * (referred to as offset)
*/
public class Slot {
- private int frameIx;
- private int offset;
+ private int frameIx;
+ private int offset;
- public Slot() {
- this.frameIx = BSTNodeUtil.INVALID_INDEX;
- this.offset = BSTNodeUtil.INVALID_INDEX;
- }
+ public Slot() {
+ this.frameIx = BSTNodeUtil.INVALID_INDEX;
+ this.offset = BSTNodeUtil.INVALID_INDEX;
+ }
- public Slot(int frameIx, int offset) {
- this.frameIx = frameIx;
- this.offset = offset;
- }
+ public Slot(int frameIx, int offset) {
+ this.frameIx = frameIx;
+ this.offset = offset;
+ }
- public void set(int frameIx, int offset) {
- this.frameIx = frameIx;
- this.offset = offset;
- }
+ public void set(int frameIx, int offset) {
+ this.frameIx = frameIx;
+ this.offset = offset;
+ }
- public int getFrameIx() {
- return frameIx;
- }
+ public int getFrameIx() {
+ return frameIx;
+ }
- public void setFrameIx(int frameIx) {
- this.frameIx = frameIx;
- }
+ public void setFrameIx(int frameIx) {
+ this.frameIx = frameIx;
+ }
- public int getOffset() {
- return offset;
- }
+ public int getOffset() {
+ return offset;
+ }
- public void setOffset(int offset) {
- this.offset = offset;
- }
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
- public boolean isNull() {
- return (frameIx == BSTNodeUtil.INVALID_INDEX)
- || (offset == BSTNodeUtil.INVALID_INDEX);
- }
+ public boolean isNull() {
+ return (frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX);
+ }
- public void clear() {
- this.frameIx = BSTNodeUtil.INVALID_INDEX;
- this.offset = BSTNodeUtil.INVALID_INDEX;
- }
+ public void clear() {
+ this.frameIx = BSTNodeUtil.INVALID_INDEX;
+ this.offset = BSTNodeUtil.INVALID_INDEX;
+ }
- public void copy(Slot s) {
- this.frameIx = s.getFrameIx();
- this.offset = s.getOffset();
- }
+ public void copy(Slot s) {
+ this.frameIx = s.getFrameIx();
+ this.offset = s.getOffset();
+ }
- public String toString() {
- return "(" + frameIx + ", " + offset + ")";
- }
+ public String toString() {
+ return "(" + frameIx + ", " + offset + ")";
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
index d811de8..383fba8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -12,238 +12,245 @@
/**
*
- * @author pouria Implements a minimum binary heap, used as selection tree, for
- * sort with replacement. This heap structure can only be used as the
- * min heap (no access to the max element) Two elements in the heap are
+ * @author pouria
+ *
+ * Implements a minimum binary heap, used as selection tree, for sort
+ * with replacement. This heap structure can only be used as the min
+ * heap (no access to the max element). Elements in the heap are
* compared based on their run numbers, and sorting key(s):
*
- * Considering two elements A and B:
+ * Considering two heap elements A and B:
*
* if RunNumber(A) > RunNumber(B) then A is larger than B if
- * RunNumber(A) == RunNumber(B), then A is smaller than B, only if the
- * value of the sort key(s) in B is greater than A (based on the sort
- * comparator)
+ * RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
+ * if the value of the sort key(s) in B is greater than A (based on the
+ * sort comparator).
*
*/
public class SortMinHeap implements ISelectionTree {
- static final int RUN_ID_IX = 0;
- static final int FRAME_IX = 1;
- static final int OFFSET_IX = 2;
- final int PNK_IX = 3;
+ static final int RUN_ID_IX = 0;
+ static final int FRAME_IX = 1;
+ static final int OFFSET_IX = 2;
+ private static final int PNK_IX = 3;
- private final int[] sortFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
- List<int[]> tree;
- IMemoryManager memMgr;
+ List<int[]> tree;
+ IMemoryManager memMgr;
+ int[] top; // Used as the temp variable to access the top, to avoid object
+ // creation
- public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, IMemoryManager memMgr) {
- this.sortFields = sortFields;
- this.comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- this.comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = memMgr;
+ public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, IMemoryManager memMgr) {
+ this.sortFields = sortFields;
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ this.comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = memMgr;
- this.tree = new ArrayList<int[]>();
- }
+ this.tree = new ArrayList<int[]>();
+ }
- /*
- * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
- */
- @Override
- public int[] getMin() {
- return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
- }
+ /*
+ * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
+ */
+ @Override
+ public void getMin(int[] result) {
+ if (tree.size() == 0) {
+ result[0] = result[1] = result[2] = result[3] = -1;
+ return;
+ }
- @Override
- public int[] peekMin() {
- if (tree.size() == 0) {
- return (new int[] { -1, -1, -1, -1 });
- }
- int[] top = tree.get(0);
- return new int[] { top[0], top[1], top[2], top[3] };
- }
+ top = delete(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ }
- @Override
- public void insert(int[] e) {
- tree.add(e);
- siftUp(tree.size() - 1);
- }
+ @Override
+ public void peekMin(int[] result) {
+ if (tree.size() == 0) {
+ result[0] = result[1] = result[2] = result[3] = -1;
+ return;
+ }
- @Override
- public void reset() {
- this.tree.clear();
- }
+ top = tree.get(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ }
- @Override
- public boolean isEmpty() {
- return (tree.size() < 1);
- }
+ @Override
+ public void insert(int[] e) {
+ tree.add(e);
+ siftUp(tree.size() - 1);
+ }
- public int _debugGetSize() {
- return tree.size();
- }
+ @Override
+ public void reset() {
+ this.tree.clear();
+ }
- private int[] delete(int nix) {
- int[] nv = tree.get(nix);
- int[] last = tree.remove(tree.size() - 1);
+ @Override
+ public boolean isEmpty() {
+ return (tree.size() < 1);
+ }
- if (tree.size() > 0) {
- tree.set(nix, last);
- } else {
- return nv;
- }
+ public int _debugGetSize() {
+ return tree.size();
+ }
- int pIx = getParent(nix);
- if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
- siftUp(nix);
- } else {
- siftDown(nix);
- }
- return nv;
- }
+ private int[] delete(int nix) {
+ int[] nv = tree.get(nix);
+ int[] last = tree.remove(tree.size() - 1);
- private void siftUp(int nodeIx) {
- int p = getParent(nodeIx);
- if (p < 0) {
- return;
- }
- while (p > -1 && (compare(nodeIx, p) < 0)) {
- swap(p, nodeIx);
- nodeIx = p;
- p = getParent(nodeIx);
- if (p < 0) { // We are at the root
- return;
- }
- }
- }
+ if (tree.size() > 0) {
+ tree.set(nix, last);
+ } else {
+ return nv;
+ }
- private void siftDown(int nodeIx) {
- int mix = getMinOfChildren(nodeIx);
- if (mix < 0) {
- return;
- }
- while (mix > -1 && (compare(mix, nodeIx) < 0)) {
- swap(mix, nodeIx);
- nodeIx = mix;
- mix = getMinOfChildren(nodeIx);
- if (mix < 0) { // We hit the leaf level
- return;
- }
- }
- }
+ int pIx = getParent(nix);
+ if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+ siftUp(nix);
+ } else {
+ siftDown(nix);
+ }
+ return nv;
+ }
- // first < sec : -1
- private int compare(int nodeSIx1, int nodeSIx2) {
- int[] n1 = tree.get(nodeSIx1);
- int[] n2 = tree.get(nodeSIx2);
- return (compare(n1, n2));
- }
+ private void siftUp(int nodeIx) {
+ int p = getParent(nodeIx);
+ if (p < 0) {
+ return;
+ }
+ while (p > -1 && (compare(nodeIx, p) < 0)) {
+ swap(p, nodeIx);
+ nodeIx = p;
+ p = getParent(nodeIx);
+ if (p < 0) { // We are at the root
+ return;
+ }
+ }
+ }
- // first < sec : -1
- private int compare(int[] n1, int[] n2) {
- // Compare Run Numbers
- if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
- return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
- }
+ private void siftDown(int nodeIx) {
+ int mix = getMinOfChildren(nodeIx);
+ if (mix < 0) {
+ return;
+ }
+ while (mix > -1 && (compare(mix, nodeIx) < 0)) {
+ swap(mix, nodeIx);
+ nodeIx = mix;
+ mix = getMinOfChildren(nodeIx);
+ if (mix < 0) { // We hit the leaf level
+ return;
+ }
+ }
+ }
- // Compare Poor man Normalized Keys
- if (n1[PNK_IX] != n2[PNK_IX]) {
- return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
- : 1;
- }
+ // first < sec : -1
+ private int compare(int nodeSIx1, int nodeSIx2) {
+ int[] n1 = tree.get(nodeSIx1);
+ int[] n2 = tree.get(nodeSIx2);
+ return (compare(n1, n2));
+ }
- return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
- n1[OFFSET_IX], n2[OFFSET_IX]);
- }
+ // first < sec : -1
+ private int compare(int[] n1, int[] n2) {
+ // Compare Run Numbers
+ if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+ return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+ }
- private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
- int r2StartOffset) {
- byte[] b1 = fr1.array();
- byte[] b2 = fr2.array();
- fta1.reset(fr1);
- fta2.reset(fr2);
- int headerLen = BSTNodeUtil.HEADER_SIZE;
- r1StartOffset += headerLen;
- r2StartOffset += headerLen;
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
- * 4);
- int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
- int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
- * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
+ // Compare Poor man Normalized Keys
+ if (n1[PNK_IX] != n2[PNK_IX]) {
+ return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
+ }
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
+ }
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
+ private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
+ byte[] b1 = fr1.array();
+ byte[] b2 = fr2.array();
+ fta1.reset(fr1);
+ fta2.reset(fr2);
+ int headerLen = BSTNodeUtil.HEADER_SIZE;
+ r1StartOffset += headerLen;
+ r2StartOffset += headerLen;
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
+ int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+ int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
- private int getMinOfChildren(int nix) { // returns index of min child
- int lix = getLeftChild(nix);
- if (lix < 0) {
- return -1;
- }
- int rix = getRightChild(nix);
- if (rix < 0) {
- return lix;
- }
- return ((compare(lix, rix) < 0) ? lix : rix);
- }
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- private void swap(int n1Ix, int n2Ix) {
- int[] temp = tree.get(n1Ix);
- tree.set(n1Ix, tree.get(n2Ix));
- tree.set(n2Ix, temp);
- }
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
- private int getLeftChild(int ix) {
- int lix = 2 * ix + 1;
- return ((lix < tree.size()) ? lix : -1);
- }
+ private int getMinOfChildren(int nix) { // returns index of min child
+ int lix = getLeftChild(nix);
+ if (lix < 0) {
+ return -1;
+ }
+ int rix = getRightChild(nix);
+ if (rix < 0) {
+ return lix;
+ }
+ return ((compare(lix, rix) < 0) ? lix : rix);
+ }
- private int getRightChild(int ix) {
- int rix = 2 * ix + 2;
- return ((rix < tree.size()) ? rix : -1);
- }
+ private void swap(int n1Ix, int n2Ix) {
+ int[] temp = tree.get(n1Ix);
+ tree.set(n1Ix, tree.get(n2Ix));
+ tree.set(n2Ix, temp);
+ }
- private int getParent(int ix) {
- return ((ix - 1) / 2);
- }
+ private int getLeftChild(int ix) {
+ int lix = 2 * ix + 1;
+ return ((lix < tree.size()) ? lix : -1);
+ }
- private ByteBuffer getFrame(int frameIx) {
- return (memMgr.getFrame(frameIx));
- }
+ private int getRightChild(int ix) {
+ int rix = 2 * ix + 2;
+ return ((rix < tree.size()) ? rix : -1);
+ }
- @Override
- public int[] getMax() {
- System.err.println("getMax() method not implemented for Min Heap");
- return null;
- }
+ private int getParent(int ix) {
+ return ((ix - 1) / 2);
+ }
- @Override
- public int[] peekMax() {
- System.err.println("peekMax() method not implemented for Min Heap");
- return null;
- }
+ private ByteBuffer getFrame(int frameIx) {
+ return (memMgr.getFrame(frameIx));
+ }
+
+ @Override
+ public void getMax(int[] result) {
+ throw new IllegalStateException("getMax() method not applicable to Min Heap");
+ }
+
+ @Override
+ public void peekMax(int[] result) {
+ throw new IllegalStateException("getMax() method not applicable to Min Heap");
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
index ab66245..c040ccc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -18,377 +18,402 @@
* elements.
*/
public class SortMinMaxHeap implements ISelectionTree {
- static final int RUN_ID_IX = 0;
- static final int FRAME_IX = 1;
- static final int OFFSET_IX = 2;
- final int PNK_IX = 3;
+ static final int RUN_ID_IX = 0;
+ static final int FRAME_IX = 1;
+ static final int OFFSET_IX = 2;
+ private static final int PNK_IX = 3;
+ private static final int NOT_EXIST = -1;
- static final int NOT_EXIST = -1;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
- private final int[] sortFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
+ List<int[]> tree;
+ IMemoryManager memMgr;
- List<int[]> tree;
- IMemoryManager memMgr;
+ int[] top; // Used as the temp variable to access the top, to avoid object
+ // creation
- public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields,
- IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, IMemoryManager memMgr) {
- this.sortFields = sortFields;
- this.comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- this.comparators[i] = comparatorFactories[i]
- .createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = memMgr;
+ public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, IMemoryManager memMgr) {
+ this.sortFields = sortFields;
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ this.comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = memMgr;
- this.tree = new ArrayList<int[]>();
- }
+ this.tree = new ArrayList<int[]>();
+ }
- @Override
- public void insert(int[] element) {
- tree.add(element);
- bubbleUp(tree.size() - 1);
- }
+ @Override
+ public void insert(int[] element) {
+ tree.add(element);
+ bubbleUp(tree.size() - 1);
+ }
- @Override
- public int[] getMin() {
- return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
- }
+ @Override
+ public void getMin(int[] result) {
+ if (tree.size() == 0) {
+ result[0] = result[1] = result[2] = result[3] = -1;
+ return;
+ }
- @Override
- public void reset() {
- this.tree.clear();
- }
+ top = delete(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ }
- @Override
- public boolean isEmpty() {
- return (tree.size() < 1);
- }
+ @Override
+ public void reset() {
+ this.tree.clear();
+ }
- @Override
- public int[] peekMin() {
- if (tree.size() == 0) {
- return (new int[] { -1, -1, -1, -1 });
- }
- int[] top = tree.get(0);
- return new int[] { top[0], top[1], top[2], top[3] };
- }
+ @Override
+ public boolean isEmpty() {
+ return (tree.size() < 1);
+ }
- @Override
- public int[] getMax() {
- if (tree.size() == 1) {
- return tree.remove(0);
- }
- if (tree.size() > 1) {
- int lc = getLeftChild(0);
- int rc = getRightChild(0);
- if (rc == -1) {
- return delete(lc);
- }
- return (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
- }
- return new int[] { -1, -1, -1, -1 };
- }
+ @Override
+ public void peekMin(int[] result) {
+ if (tree.size() == 0) {
+ result[0] = result[1] = result[2] = result[3] = -1;
+ return;
+ }
- @Override
- public int[] peekMax() {
- if (tree.size() == 1) {
- int[] t = tree.get(0);
- return new int[] { t[0], t[1], t[2], t[3] };
- }
- if (tree.size() > 1) {
- int lc = getLeftChild(0);
- int rc = getRightChild(0);
- if (rc == -1) {
- int[] t = tree.get(lc);
- return new int[] { t[0], t[1], t[2], t[3] };
- }
- int[] t = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
- return new int[] { t[0], t[1], t[2], t[3] };
- }
- return new int[] { -1, -1, -1, -1 };
- }
+ top = tree.get(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ }
- private int[] delete(int delIx) {
- int s = tree.size();
- if (s > 1) {
- int[] delEntry = tree.get(delIx);
- int[] last = (tree.remove(s - 1));
- if (delIx != tree.size()) {
- tree.set(delIx, last);
- trickleDown(delIx);
- }
- return delEntry;
- } else if (s == 1) {
- return tree.remove(0);
- }
- return null;
- }
+ @Override
+ public void getMax(int[] result) {
+ if (tree.size() == 1) {
+ top = tree.remove(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ return;
+ }
- private void bubbleUp(int ix) {
- int p = getParentIx(ix);
- if (isAtMinLevel(ix)) {
- if (p != NOT_EXIST && compare(p, ix) < 0) {
- swap(ix, p);
- bubbleUpMax(p);
- } else {
- bubbleUpMin(ix);
- }
- } else { // i is at max level
- if (p != NOT_EXIST && compare(ix, p) < 0) {
- swap(ix, p);
- bubbleUpMin(p);
- } else {
- bubbleUpMax(ix);
- }
- }
- }
+ if (tree.size() > 1) {
+ int lc = getLeftChild(0);
+ int rc = getRightChild(0);
- private void bubbleUpMax(int ix) {
- int gp = getGrandParent(ix);
- if (gp != NOT_EXIST && compare(gp, ix) < 0) {
- swap(ix, gp);
- bubbleUpMax(gp);
- }
- }
+ if (rc == -1) {
+ top = delete(lc);
+ } else {
+ top = (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+ }
- private void bubbleUpMin(int ix) {
- int gp = getGrandParent(ix);
- if (gp != NOT_EXIST && compare(ix, gp) < 0) {
- swap(ix, gp);
- bubbleUpMin(gp);
- }
- }
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ return;
- private void trickleDown(int ix) {
- if (isAtMinLevel(ix)) {
- trickleDownMin(ix);
- } else {
- trickleDownMax(ix);
- }
- }
+ }
+ result[0] = result[1] = result[2] = result[3] = -1;
+ }
- private void trickleDownMax(int ix) {
- int maxIx = getMaxOfDescendents(ix);
- if (maxIx == NOT_EXIST) {
- return;
- }
- if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
- // children
- if (compare(ix, maxIx) < 0) {
- swap(maxIx, ix);
- int p = getParentIx(maxIx);
- if (p != NOT_EXIST && compare(maxIx, p) < 0) {
- swap(maxIx, p);
- }
- trickleDownMax(maxIx);
- }
- } else { // A children
- if (compare(ix, maxIx) < 0) {
- swap(ix, maxIx);
- }
- }
- }
+ @Override
+ public void peekMax(int[] result) {
+ if (tree.size() == 1) {
+ top = tree.get(0);
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ return;
+ }
+ if (tree.size() > 1) {
+ int lc = getLeftChild(0);
+ int rc = getRightChild(0);
- private void trickleDownMin(int ix) {
- int minIx = getMinOfDescendents(ix);
- if (minIx == NOT_EXIST) {
- return;
- }
- if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
- // children
- if (compare(minIx, ix) < 0) {
- swap(minIx, ix);
- int p = getParentIx(minIx);
- if (p != NOT_EXIST && compare(p, minIx) < 0) {
- swap(minIx, p);
- }
- trickleDownMin(minIx);
- }
- } else { // A children
- if (compare(minIx, ix) < 0) {
- swap(ix, minIx);
- }
- }
- }
+ if (rc == -1) {
+ top = tree.get(lc);
+ } else {
+ top = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+ }
- // Min among children and grand children
- private int getMinOfDescendents(int ix) {
- int lc = getLeftChild(ix);
- if (lc == NOT_EXIST) {
- return NOT_EXIST;
- }
- int rc = getRightChild(ix);
- if (rc == NOT_EXIST) {
- return lc;
- }
- int min = (compare(lc, rc) < 0) ? lc : rc;
- int[] lgc = getLeftGrandChildren(ix);
- int[] rgc = getRightGrandChildren(ix);
- for (int k = 0; k < 2; k++) {
- if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
- min = lgc[k];
- }
- if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
- min = rgc[k];
- }
- }
- return min;
- }
+ for (int i = 0; i < top.length; i++) {
+ result[i] = top[i];
+ }
+ return;
+ }
+ result[0] = result[1] = result[2] = result[3] = -1;
+ }
- // Max among children and grand children
- private int getMaxOfDescendents(int ix) {
- int lc = getLeftChild(ix);
- if (lc == NOT_EXIST) {
- return NOT_EXIST;
- }
- int rc = getRightChild(ix);
- if (rc == NOT_EXIST) {
- return lc;
- }
- int max = (compare(lc, rc) < 0) ? rc : lc;
- int[] lgc = getLeftGrandChildren(ix);
- int[] rgc = getRightGrandChildren(ix);
- for (int k = 0; k < 2; k++) {
- if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
- max = lgc[k];
- }
- if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
- max = rgc[k];
- }
- }
- return max;
- }
+ private int[] delete(int delIx) {
+ int s = tree.size();
+ if (s > 1) {
+ int[] delEntry = tree.get(delIx);
+ int[] last = (tree.remove(s - 1));
+ if (delIx != tree.size()) {
+ tree.set(delIx, last);
+ trickleDown(delIx);
+ }
+ return delEntry;
+ } else if (s == 1) {
+ return tree.remove(0);
+ }
+ return null;
+ }
- private void swap(int n1Ix, int n2Ix) {
- int[] temp = tree.get(n1Ix);
- tree.set(n1Ix, tree.get(n2Ix));
- tree.set(n2Ix, temp);
- }
+ private void bubbleUp(int ix) {
+ int p = getParentIx(ix);
+ if (isAtMinLevel(ix)) {
+ if (p != NOT_EXIST && compare(p, ix) < 0) {
+ swap(ix, p);
+ bubbleUpMax(p);
+ } else {
+ bubbleUpMin(ix);
+ }
+ } else { // i is at max level
+ if (p != NOT_EXIST && compare(ix, p) < 0) {
+ swap(ix, p);
+ bubbleUpMin(p);
+ } else {
+ bubbleUpMax(ix);
+ }
+ }
+ }
- private int getParentIx(int i) {
- if (i == 0) {
- return NOT_EXIST;
- }
- return (i - 1) / 2;
- }
+ private void bubbleUpMax(int ix) {
+ int gp = getGrandParent(ix);
+ if (gp != NOT_EXIST && compare(gp, ix) < 0) {
+ swap(ix, gp);
+ bubbleUpMax(gp);
+ }
+ }
- private int getGrandParent(int i) {
- int p = getParentIx(i);
- return p != -1 ? getParentIx(p) : NOT_EXIST;
- }
+ private void bubbleUpMin(int ix) {
+ int gp = getGrandParent(ix);
+ if (gp != NOT_EXIST && compare(ix, gp) < 0) {
+ swap(ix, gp);
+ bubbleUpMin(gp);
+ }
+ }
- private int getLeftChild(int i) {
- int lc = 2 * i + 1;
- return lc < tree.size() ? lc : NOT_EXIST;
- }
+ private void trickleDown(int ix) {
+ if (isAtMinLevel(ix)) {
+ trickleDownMin(ix);
+ } else {
+ trickleDownMax(ix);
+ }
+ }
- private int[] getLeftGrandChildren(int i) {
- int lc = getLeftChild(i);
- return lc != NOT_EXIST ? new int[] { getLeftChild(lc),
- getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
- }
+ private void trickleDownMax(int ix) {
+ int maxIx = getMaxOfDescendents(ix);
+ if (maxIx == NOT_EXIST) {
+ return;
+ }
+ if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
+ // children
+ if (compare(ix, maxIx) < 0) {
+ swap(maxIx, ix);
+ int p = getParentIx(maxIx);
+ if (p != NOT_EXIST && compare(maxIx, p) < 0) {
+ swap(maxIx, p);
+ }
+ trickleDownMax(maxIx);
+ }
+ } else { // A children
+ if (compare(ix, maxIx) < 0) {
+ swap(ix, maxIx);
+ }
+ }
+ }
- private int getRightChild(int i) {
- int rc = 2 * i + 2;
- return rc < tree.size() ? rc : -1;
- }
+ private void trickleDownMin(int ix) {
+ int minIx = getMinOfDescendents(ix);
+ if (minIx == NOT_EXIST) {
+ return;
+ }
+ if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
+ // children
+ if (compare(minIx, ix) < 0) {
+ swap(minIx, ix);
+ int p = getParentIx(minIx);
+ if (p != NOT_EXIST && compare(p, minIx) < 0) {
+ swap(minIx, p);
+ }
+ trickleDownMin(minIx);
+ }
+ } else { // A children
+ if (compare(minIx, ix) < 0) {
+ swap(ix, minIx);
+ }
+ }
+ }
- private int[] getRightGrandChildren(int i) {
- int rc = getRightChild(i);
- return rc != NOT_EXIST ? new int[] { getLeftChild(rc),
- getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
- }
+ // Min among children and grand children
+ private int getMinOfDescendents(int ix) {
+ int lc = getLeftChild(ix);
+ if (lc == NOT_EXIST) {
+ return NOT_EXIST;
+ }
+ int rc = getRightChild(ix);
+ if (rc == NOT_EXIST) {
+ return lc;
+ }
+ int min = (compare(lc, rc) < 0) ? lc : rc;
+ int[] lgc = getLeftGrandChildren(ix);
+ int[] rgc = getRightGrandChildren(ix);
+ for (int k = 0; k < 2; k++) {
+ if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
+ min = lgc[k];
+ }
+ if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
+ min = rgc[k];
+ }
+ }
+ return min;
+ }
- private boolean isAtMinLevel(int i) {
- int l = getLevel(i);
- return l % 2 == 0 ? true : false;
- }
+ // Max among children and grand children
+ private int getMaxOfDescendents(int ix) {
+ int lc = getLeftChild(ix);
+ if (lc == NOT_EXIST) {
+ return NOT_EXIST;
+ }
+ int rc = getRightChild(ix);
+ if (rc == NOT_EXIST) {
+ return lc;
+ }
+ int max = (compare(lc, rc) < 0) ? rc : lc;
+ int[] lgc = getLeftGrandChildren(ix);
+ int[] rgc = getRightGrandChildren(ix);
+ for (int k = 0; k < 2; k++) {
+ if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
+ max = lgc[k];
+ }
+ if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
+ max = rgc[k];
+ }
+ }
+ return max;
+ }
- private int getLevel(int i) {
- if (i == 0) {
- return 0;
- }
- int l = (int) Math.floor(Math.log(i) / Math.log(2));
- if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
- return (l + 1);
- }
- return l;
- }
+ private void swap(int n1Ix, int n2Ix) {
+ int[] temp = tree.get(n1Ix);
+ tree.set(n1Ix, tree.get(n2Ix));
+ tree.set(n2Ix, temp);
+ }
- private ByteBuffer getFrame(int frameIx) {
- return (memMgr.getFrame(frameIx));
- }
+ private int getParentIx(int i) {
+ if (i == 0) {
+ return NOT_EXIST;
+ }
+ return (i - 1) / 2;
+ }
- // first < sec : -1
- private int compare(int nodeSIx1, int nodeSIx2) {
- int[] n1 = tree.get(nodeSIx1);
- int[] n2 = tree.get(nodeSIx2);
- return (compare(n1, n2));
- }
+ private int getGrandParent(int i) {
+ int p = getParentIx(i);
+ return p != -1 ? getParentIx(p) : NOT_EXIST;
+ }
- // first < sec : -1
- private int compare(int[] n1, int[] n2) {
- // Compare Run Numbers
- if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
- return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
- }
+ private int getLeftChild(int i) {
+ int lc = 2 * i + 1;
+ return lc < tree.size() ? lc : NOT_EXIST;
+ }
- // Compare Poor man Normalized Keys
- if (n1[PNK_IX] != n2[PNK_IX]) {
- return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
- : 1;
- }
+ private int[] getLeftGrandChildren(int i) {
+ int lc = getLeftChild(i);
+ return lc != NOT_EXIST ? new int[] { getLeftChild(lc), getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
+ }
- return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
- n1[OFFSET_IX], n2[OFFSET_IX]);
- }
+ private int getRightChild(int i) {
+ int rc = 2 * i + 2;
+ return rc < tree.size() ? rc : -1;
+ }
- private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
- int r2StartOffset) {
- byte[] b1 = fr1.array();
- byte[] b2 = fr2.array();
- fta1.reset(fr1);
- fta2.reset(fr2);
- int headerLen = BSTNodeUtil.HEADER_SIZE;
- r1StartOffset += headerLen;
- r2StartOffset += headerLen;
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
- * 4);
- int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
- int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
- * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
+ private int[] getRightGrandChildren(int i) {
+ int rc = getRightChild(i);
+ return rc != NOT_EXIST ? new int[] { getLeftChild(rc), getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
+ }
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ private boolean isAtMinLevel(int i) {
+ int l = getLevel(i);
+ return l % 2 == 0 ? true : false;
+ }
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
+ private int getLevel(int i) {
+ if (i == 0) {
+ return 0;
+ }
+ int l = (int) Math.floor(Math.log(i) / Math.log(2));
+ if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+ return (l + 1);
+ }
+ return l;
+ }
- public String _debugPrintTree() {
- String s = "";
- for (int i = 0; i < tree.size(); i++) {
- int[] n = tree.get(i);
- s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", "
- + n[OFFSET_IX] + "), ";
- }
- return s;
- }
+ private ByteBuffer getFrame(int frameIx) {
+ return (memMgr.getFrame(frameIx));
+ }
+
+ // first < sec : -1
+ private int compare(int nodeSIx1, int nodeSIx2) {
+ int[] n1 = tree.get(nodeSIx1);
+ int[] n2 = tree.get(nodeSIx2);
+ return (compare(n1, n2));
+ }
+
+ // first < sec : -1
+ private int compare(int[] n1, int[] n2) {
+ // Compare Run Numbers
+ if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+ return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+ }
+
+ // Compare Poor man Normalized Keys
+ if (n1[PNK_IX] != n2[PNK_IX]) {
+ return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
+ }
+
+ return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
+ }
+
+ private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
+ byte[] b1 = fr1.array();
+ byte[] b2 = fr2.array();
+ fta1.reset(fr1);
+ fta2.reset(fr2);
+ int headerLen = BSTNodeUtil.HEADER_SIZE;
+ r1StartOffset += headerLen;
+ r2StartOffset += headerLen;
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
+ int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+ int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public String _debugPrintTree() {
+ String s = "";
+ for (int i = 0; i < tree.size(); i++) {
+ int[] n = tree.get(i);
+ s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", " + n[OFFSET_IX] + "), ";
+ }
+ return s;
+ }
}
\ No newline at end of file