Classes for the implementation of External Sorting with Replacement Selection. Contains classes for Sorting (with and wout limit) and memory management
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@818 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 7647e50..6573ffa 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -59,7 +59,23 @@
}
return false;
}
-
+
+ /*
+ * ADDED BY POURIA (for his sort operator)
+ * bytes already has the header (the fields offset)
+ */
+ public boolean append(byte[] bytes, int offset, int length){
+ if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+ tupleDataEndOffset += length;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
+
public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
int length = tEndOffset - tStartOffset;
if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
new file mode 100644
index 0000000..28c3f34
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -0,0 +1,797 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * @author pouria Memory Manager based on Binary Search Tree (BST) Free slot
+ * size is the key for the BST nodes. Each node in BST shows a class of
+ * free slots, while all the free slots with the exact same length are
+ * stored as a LinkedList, whose head is a BST node. BST is not stored
+ * as a separate data structure, but the actual free spaces of the
+ * memory slots are used to hold BST nodes. Each BST node has the
+ * logical structure, defined in the BSTNodeUtil class.
+ *
+ */
+public class BSTMemMgr implements IMemoryManager {
+
+ private final IHyracksCommonContext ctx;
+ public static int FRAME_SIZE;
+
+ private ByteBuffer[] frames;
+ private ByteBuffer convertBuffer;
+ private Slot root;
+ private Slot result; // A reusable object to hold one node returned as
+ // methods result
+ private Slot insertSlot; // A reusable object to hold one node in the insert
+ // process
+ private Slot[] par_res;
+ private int lastFrame;
+
+ private static int _debug_free_slots = 0;
+ private static int _debug_tree_size = 0;
+ private int _debug_total_lookup_steps;
+ private int _debug_total_lookup_counts;
+ private int _debug_depth_counter;
+ private int _debug_max_depth;
+
+ public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+ this.ctx = ctx;
+ FRAME_SIZE = ctx.getFrameSize();
+ convertBuffer = ByteBuffer.allocate(4);
+ frames = new ByteBuffer[memSize];
+ lastFrame = -1;
+ root = new Slot();
+ insertSlot = new Slot();
+ result = new Slot();
+ par_res = new Slot[] { new Slot(), new Slot() };
+ _debug_total_lookup_counts = 0;
+ _debug_total_lookup_steps = 0;
+ }
+
+ /**
+ * result is the container sent by the caller to hold the results
+ */
+ @Override
+ public void allocate(int length, Slot result) throws HyracksDataException {
+ _debug_total_lookup_counts++;
+ search(length, par_res);
+ if (par_res[1].isNull()) {
+ addFrame(par_res);
+ if (par_res[1].isNull()) {
+ return;
+ }
+ }
+
+ int sl = BSTNodeUtil.getLength(par_res[1], frames, convertBuffer);
+ int acLen = BSTNodeUtil.getActualLength(length);
+ if (shouldSplit(sl, acLen)) {
+ int[] s = split(par_res[1], par_res[0], acLen);
+ int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames,
+ convertBuffer);
+ insert(s[2], s[3], insertLen); // inserting second half of the split
+ // slot
+ BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
+ result.set(s[0], s[1]);
+ return;
+ }
+ allocate(par_res[1], par_res[0], length, result);
+ }
+
+ @Override
+ public int unallocate(Slot s) throws HyracksDataException {
+ int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
+ int actualLen = BSTNodeUtil.getActualLength(usedLen);
+ int fix = s.getFrameIx();
+ int off = s.getOffset();
+
+ int prevMemSlotFooter_off = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
+ : BSTNodeUtil.INVALID_INDEX);
+ int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
+ int nextMemSlotHeader_off = (t < FRAME_SIZE ? t
+ : BSTNodeUtil.INVALID_INDEX);
+ // Remember: next and prev memory slots have the same frame index as the
+ // unallocating slot
+ if (!isNodeNull(fix, prevMemSlotFooter_off)
+ && BSTNodeUtil.isFree(fix, prevMemSlotFooter_off, frames)) {
+ int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooter_off,
+ frames, convertBuffer);
+ removeFromList(fix, prevMemSlotFooter_off - leftLength
+ - BSTNodeUtil.HEADER_SIZE);
+ int concatLength = actualLen + leftLength + 2
+ * BSTNodeUtil.HEADER_SIZE;
+ if (!isNodeNull(fix, nextMemSlotHeader_off)
+ && BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
+ removeFromList(fix, nextMemSlotHeader_off);
+ concatLength += BSTNodeUtil.getLength(fix,
+ nextMemSlotHeader_off, frames, convertBuffer)
+ + 2
+ * BSTNodeUtil.HEADER_SIZE;
+ }
+ insert(fix, prevMemSlotFooter_off - leftLength
+ - BSTNodeUtil.HEADER_SIZE, concatLength); // newly (merged)
+ // slot starts
+ // at the prev
+ // slot offset
+ return concatLength;
+
+ } else if (!isNodeNull(fix, nextMemSlotHeader_off)
+ && BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
+ removeFromList(fix, nextMemSlotHeader_off);
+ int concatLength = actualLen
+ + BSTNodeUtil.getLength(fix, nextMemSlotHeader_off, frames,
+ convertBuffer) + 2 * BSTNodeUtil.HEADER_SIZE;
+ insert(fix, off, concatLength); // newly (merged) slot starts at the
+ // unallocating slot offset
+ return concatLength;
+ }
+ // unallocating slot is not merging with any neighbor
+ insert(fix, off, actualLen);
+ return actualLen;
+ }
+
+ @Override
+ public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
+ int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
+ int length = BSTNodeUtil.getLength(frameIx, offset, frames,
+ convertBuffer);
+ return dest.append(frames[frameIx].array(), offToRead, length);
+ }
+
+ @Override
+ public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
+ int tIndex) {
+ int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
+ int tStartOffset = src.getTupleStartOffset(tIndex);
+ int tEndOffset = src.getTupleEndOffset(tIndex);
+ int tupleLength = tEndOffset - tStartOffset;
+ ByteBuffer srcBuffer = src.getBuffer();
+ System.arraycopy(srcBuffer.array(), tStartOffset,
+ frames[frameIx].array(), offToCopy, tupleLength);
+ return true;
+ }
+
+ @Override
+ public ByteBuffer getFrame(int frameIndex) {
+ return frames[frameIndex];
+ }
+
+ public String _debug_getAvgSearchPath() {
+ double avg = (((double) _debug_total_lookup_steps) / ((double) _debug_total_lookup_counts));
+ return "\nTotal allocation requests:\t" + _debug_total_lookup_counts
+ + "\nAvg Allocation Path Length:\t" + avg
+ + "\nMax BST Depth:\t" + _debug_max_depth;
+ }
+
+ public void _debug_decLookupCount() {
+ _debug_total_lookup_counts--;
+ }
+
+ /**
+ *
+ * @param p_r
+ * is the container passed by the caller to contain the results
+ * @throws HyracksDataException
+ */
+ private void addFrame(Slot[] p_r) throws HyracksDataException {
+ _debug_depth_counter = 0;
+ clear(p_r);
+ if ((lastFrame + 1) >= frames.length) {
+ return;
+ }
+ frames[++lastFrame] = allocateFrame();
+ int l = FRAME_SIZE - 2 * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
+ initNewNode(lastFrame, 0);
+
+ p_r[1].copy(root);
+ if (p_r[1].isNull()) { // root is null
+ root.set(lastFrame, 0);
+ initNewNode(root.getFrameIx(), root.getOffset());
+ p_r[1].copy(root);
+ return;
+ }
+
+ while (!p_r[1].isNull()) {
+ _debug_depth_counter++;
+ if (BSTNodeUtil.getLength(p_r[1], frames, convertBuffer) == l) {
+ append(p_r[1].getFrameIx(), p_r[1].getOffset(), lastFrame, 0);
+ p_r[1].set(lastFrame, 0);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ }
+ if (l < BSTNodeUtil.getLength(p_r[1], frames, convertBuffer)) {
+ if (isNodeNull(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
+ convertBuffer), BSTNodeUtil.leftChild_offset(p_r[1],
+ frames, convertBuffer))) {
+ BSTNodeUtil.setLeftChild(p_r[1].getFrameIx(),
+ p_r[1].getOffset(), lastFrame, 0, frames);
+ p_r[0].copy(p_r[1]);
+ p_r[1].set(lastFrame, 0);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ } else {
+ p_r[0].copy(p_r[1]);
+ p_r[1].set(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
+ convertBuffer), BSTNodeUtil.leftChild_offset(
+ p_r[1], frames, convertBuffer));
+ }
+ } else {
+ if (isNodeNull(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
+ convertBuffer), BSTNodeUtil.rightChild_offset(p_r[1],
+ frames, convertBuffer))) {
+ BSTNodeUtil.setRightChild(p_r[1].getFrameIx(),
+ p_r[1].getOffset(), lastFrame, 0, frames);
+ p_r[0].copy(p_r[1]);
+ p_r[1].set(lastFrame, 0);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ } else {
+ p_r[0].copy(p_r[1]);
+ p_r[1].set(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
+ convertBuffer), BSTNodeUtil.rightChild_offset(
+ p_r[1], frames, convertBuffer));
+ }
+ }
+ }
+ throw new HyracksDataException(
+ "New Frame could not be added to BSTMemMgr");
+ }
+
+ private void insert(int fix, int off, int length)
+ throws HyracksDataException {
+ _debug_depth_counter = 0;
+ BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
+ initNewNode(fix, off);
+
+ if (root.isNull()) {
+ root.set(fix, off);
+ return;
+ }
+
+ insertSlot.clear();
+ insertSlot.copy(root);
+ while (!insertSlot.isNull()) {
+ _debug_depth_counter++;
+ int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames,
+ convertBuffer);
+ if (curSlotLen == length) {
+ append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix,
+ off);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ }
+ if (length < curSlotLen) {
+ int lc_fix = BSTNodeUtil.leftChild_fIx(insertSlot, frames,
+ convertBuffer);
+ int lc_off = BSTNodeUtil.leftChild_offset(insertSlot, frames,
+ convertBuffer);
+ if (isNodeNull(lc_fix, lc_off)) {
+ initNewNode(fix, off);
+ BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(),
+ insertSlot.getOffset(), fix, off, frames);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ } else {
+ insertSlot.set(lc_fix, lc_off);
+ }
+ } else {
+ int rc_fix = BSTNodeUtil.rightChild_fIx(insertSlot, frames,
+ convertBuffer);
+ int rc_off = BSTNodeUtil.rightChild_offset(insertSlot, frames,
+ convertBuffer);
+ if (isNodeNull(rc_fix, rc_off)) {
+ initNewNode(fix, off);
+ BSTNodeUtil.setRightChild(insertSlot.getFrameIx(),
+ insertSlot.getOffset(), fix, off, frames);
+ if (_debug_depth_counter > _debug_max_depth) {
+ _debug_max_depth = _debug_depth_counter;
+ }
+ return;
+ } else {
+ insertSlot.set(rc_fix, rc_off);
+ }
+ }
+ }
+ throw new HyracksDataException(
+ "Failure in node insertion into BST in BSTMemMgr");
+ }
+
+ /**
+ * @param length
+ * @param target
+ * is the container sent by the caller to hold the results
+ */
+ private void search(int length, Slot[] target) {
+ clear(target);
+ result.clear();
+
+ if (root.isNull()) {
+ return;
+ }
+
+ Slot lastLeftParent = new Slot();
+ Slot lastLeft = new Slot();
+ Slot parent = new Slot();
+ result.copy(root);
+
+ while (!result.isNull()) {
+ _debug_total_lookup_steps++;
+ if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
+ target[0].copy(parent);
+ target[1].copy(result);
+ return;
+ }
+ if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
+ lastLeftParent.copy(parent);
+ lastLeft.copy(result);
+ parent.copy(result);
+ int fix = BSTNodeUtil.leftChild_fIx(result, frames,
+ convertBuffer);
+ int off = BSTNodeUtil.leftChild_offset(result, frames,
+ convertBuffer);
+ result.set(fix, off);
+ } else {
+ parent.copy(result);
+ int fix = BSTNodeUtil.rightChild_fIx(result, frames,
+ convertBuffer);
+ int off = BSTNodeUtil.rightChild_offset(result, frames,
+ convertBuffer);
+ result.set(fix, off);
+ }
+ }
+
+ target[0].copy(lastLeftParent);
+ target[1].copy(lastLeft);
+
+ }
+
+ private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
+ initNewNode(nodeFix, nodeOff);
+
+ int fix = BSTNodeUtil.next_fIx(headFix, headOff, frames, convertBuffer); // frameIx
+ // for
+ // the
+ // current
+ // next
+ // of
+ // head
+ int off = BSTNodeUtil.next_offset(headFix, headOff, frames,
+ convertBuffer); // offset for the current next of head
+ BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
+
+ if (!isNodeNull(fix, off)) {
+ BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
+ }
+ BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
+ BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
+ }
+
+ private int[] split(Slot listHead, Slot parent, int length) {
+ int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer)
+ - length - 2 * BSTNodeUtil.HEADER_SIZE;
+ // We split the node after slots-list head
+ if (!isNodeNull(BSTNodeUtil.next_fIx(listHead, frames, convertBuffer),
+ BSTNodeUtil.next_offset(listHead, frames, convertBuffer))) {
+ int afterHeadFix = BSTNodeUtil.next_fIx(listHead, frames,
+ convertBuffer);
+ int afterHeadOff = BSTNodeUtil.next_offset(listHead, frames,
+ convertBuffer);
+ int afHNextFix = BSTNodeUtil.next_fIx(afterHeadFix, afterHeadOff,
+ frames, convertBuffer);
+ int afHNextOff = BSTNodeUtil.next_offset(afterHeadFix,
+ afterHeadOff, frames, convertBuffer);
+ BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(),
+ afHNextFix, afHNextOff, frames);
+ if (!isNodeNull(afHNextFix, afHNextOff)) {
+ BSTNodeUtil.setPrev(afHNextFix, afHNextOff,
+ listHead.getFrameIx(), listHead.getOffset(), frames);
+ }
+ int secondOffset = afterHeadOff + length + 2
+ * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length,
+ true, frames);
+ BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true,
+ frames);
+
+ return new int[] { afterHeadFix, afterHeadOff, afterHeadFix,
+ secondOffset };
+ }
+ // We split the head
+ int secondOffset = listHead.getOffset() + length + 2
+ * BSTNodeUtil.HEADER_SIZE;
+ BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(),
+ listHead.getOffset(), length, true, frames);
+ BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2,
+ true, frames);
+
+ fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(),
+ parent.getFrameIx(), parent.getOffset());
+ return new int[] { listHead.getFrameIx(), listHead.getOffset(),
+ listHead.getFrameIx(), secondOffset };
+ }
+
+ private void fixTreePtrs(int node_fix, int node_off, int par_fix,
+ int par_off) {
+ int nlc_fix = BSTNodeUtil.leftChild_fIx(node_fix, node_off, frames,
+ convertBuffer);
+ int nlc_off = BSTNodeUtil.leftChild_offset(node_fix, node_off, frames,
+ convertBuffer);
+ int nrc_fix = BSTNodeUtil.rightChild_fIx(node_fix, node_off, frames,
+ convertBuffer);
+ int nrc_off = BSTNodeUtil.rightChild_offset(node_fix, node_off, frames,
+ convertBuffer);
+
+ int status = -1; // (status==0 if node is left child of parent)
+ // (status==1 if node is right child of parent)
+ if (!isNodeNull(par_fix, par_off)) {
+ int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+ node_fix, node_off, frames, convertBuffer));
+ int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+ par_fix, par_off, frames, convertBuffer));
+ status = ((nlen < plen) ? 0 : 1);
+ }
+
+ if (!isNodeNull(nlc_fix, nlc_off) && !isNodeNull(nrc_fix, nrc_off)) { // Node
+ // has
+ // two
+ // children
+ int pmin_fix = node_fix;
+ int pmin_off = node_off;
+ int min_fix = nrc_fix;
+ int min_off = nrc_off;
+ int next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
+ frames, convertBuffer);
+ int next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
+ frames, convertBuffer);
+
+ while (!isNodeNull(next_left_fix, next_left_off)) {
+ pmin_fix = min_fix;
+ pmin_off = min_off;
+ min_fix = next_left_fix;
+ min_off = next_left_off;
+ next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
+ frames, convertBuffer); // min is now pointing to
+ // current (old) next left
+ next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
+ frames, convertBuffer); // min is now pointing to
+ // current (old) next left
+ }
+
+ if ((nrc_fix == min_fix) && (nrc_off == min_off)) { // nrc is the
+ // same as min
+ BSTNodeUtil.setLeftChild(nrc_fix, nrc_off, nlc_fix, nlc_off,
+ frames);
+ } else { // min is different from nrc
+ int min_right_fix = BSTNodeUtil.rightChild_fIx(min_fix,
+ min_off, frames, convertBuffer);
+ int min_right_off = BSTNodeUtil.rightChild_offset(min_fix,
+ min_off, frames, convertBuffer);
+ BSTNodeUtil.setRightChild(min_fix, min_off, nrc_fix, nrc_off,
+ frames);
+ BSTNodeUtil.setLeftChild(min_fix, min_off, nlc_fix, nlc_off,
+ frames);
+ BSTNodeUtil.setLeftChild(pmin_fix, pmin_off, min_right_fix,
+ min_right_off, frames);
+ }
+
+ // Now dealing with the parent
+ if (!isNodeNull(par_fix, par_off)) {
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(par_fix, par_off, min_fix,
+ min_off, frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(par_fix, par_off, min_fix,
+ min_off, frames);
+ }
+ } else { // No parent (node was the root)
+ root.set(min_fix, min_off);
+ }
+ return;
+ }
+
+ else if (!isNodeNull(nlc_fix, nlc_off)) { // Node has only left child
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(par_fix, par_off, nlc_fix, nlc_off,
+ frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(par_fix, par_off, nlc_fix, nlc_off,
+ frames);
+ } else if (status == -1) { // No parent, so node is root
+ root.set(nlc_fix, nlc_off);
+ }
+ return;
+ }
+
+ else if (!isNodeNull(nrc_fix, nrc_off)) { // Node has only right child
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(par_fix, par_off, nrc_fix, nrc_off,
+ frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(par_fix, par_off, nrc_fix, nrc_off,
+ frames);
+ } else if (status == -1) { // No parent, so node is root
+ root.set(nrc_fix, nrc_off);
+ }
+ return;
+ }
+
+ else { // Node is leaf (no children)
+ if (status == 0) {
+ BSTNodeUtil.setLeftChild(par_fix, par_off,
+ BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
+ frames);
+ } else if (status == 1) {
+ BSTNodeUtil.setRightChild(par_fix, par_off,
+ BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
+ frames);
+ } else { // node was the only node in the tree
+ root.clear();
+ }
+ return;
+ }
+ }
+
+ /**
+ * Allocation with no splitting but padding
+ *
+ * @param node
+ * @param parent
+ * @param result
+ * is the container sent by the caller to hold the results
+ */
+ private void allocate(Slot node, Slot parent, int length, Slot result) {
+ int nextFix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
+ int nextOff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
+ if (!isNodeNull(nextFix, nextOff)) {
+ int nextOfNext_fix = BSTNodeUtil.next_fIx(nextFix, nextOff, frames,
+ convertBuffer);
+ int nextOfNext_off = BSTNodeUtil.next_offset(nextFix, nextOff,
+ frames, convertBuffer);
+ BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(),
+ nextOfNext_fix, nextOfNext_off, frames);
+ if (!isNodeNull(nextOfNext_fix, nextOfNext_off)) {
+ BSTNodeUtil.setPrev(nextOfNext_fix, nextOfNext_off,
+ node.getFrameIx(), node.getOffset(), frames);
+ }
+ BSTNodeUtil
+ .setHeaderFooter(nextFix, nextOff, length, false, frames);
+ result.set(nextFix, nextOff);
+ return;
+ }
+
+ fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(),
+ parent.getOffset());
+ BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(),
+ length, false, frames);
+ result.copy(node);
+ }
+
+ private void removeFromList(int fix, int off) {
+ int nx_fix = BSTNodeUtil.next_fIx(fix, off, frames, convertBuffer);
+ int nx_off = BSTNodeUtil.next_offset(fix, off, frames, convertBuffer);
+ int prev_fix = BSTNodeUtil.prev_fIx(fix, off, frames, convertBuffer);
+ int prev_off = BSTNodeUtil.prev_offset(fix, off, frames, convertBuffer);
+ if (!isNodeNull(prev_fix, prev_off) && !isNodeNull(nx_fix, nx_off)) {
+ BSTNodeUtil.setNext(prev_fix, prev_off, nx_fix, nx_off, frames);
+ BSTNodeUtil.setPrev(nx_fix, nx_off, prev_fix, prev_off, frames);
+ BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ return;
+ }
+ if (!isNodeNull(prev_fix, prev_off)) {
+ BSTNodeUtil.setNext(prev_fix, prev_off, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ return;
+ }
+
+ // We need to find the parent, so we can fix the tree
+ int par_fix = BSTNodeUtil.INVALID_INDEX;
+ int par_off = BSTNodeUtil.INVALID_INDEX;
+ int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix,
+ off, frames, convertBuffer));
+ fix = root.getFrameIx();
+ off = root.getOffset();
+ int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ while (length != curLen) {
+ par_fix = fix;
+ par_off = off;
+ if (length < curLen) {
+ fix = BSTNodeUtil.leftChild_fIx(par_fix, par_off, frames,
+ convertBuffer); // par_fix is now the old(current) fix
+ off = BSTNodeUtil.leftChild_offset(par_fix, par_off, frames,
+ convertBuffer); // par_off is now the old(current) off
+ } else {
+ fix = BSTNodeUtil.rightChild_fIx(par_fix, par_off, frames,
+ convertBuffer); // par_fix is now the old(current) fix
+ off = BSTNodeUtil.rightChild_offset(par_fix, par_off, frames,
+ convertBuffer); // par_off is now the old(current) off
+ }
+ curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ }
+
+ if (!isNodeNull(nx_fix, nx_off)) { // it is head of the list (in the
+ // tree)
+ BSTNodeUtil.setPrev(nx_fix, nx_off, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ int node_lc_fix = BSTNodeUtil.leftChild_fIx(fix, off, frames,
+ convertBuffer);
+ int node_lc_off = BSTNodeUtil.leftChild_offset(fix, off, frames,
+ convertBuffer);
+ int node_rc_fix = BSTNodeUtil.rightChild_fIx(fix, off, frames,
+ convertBuffer);
+ int node_rc_off = BSTNodeUtil.rightChild_offset(fix, off, frames,
+ convertBuffer);
+ BSTNodeUtil.setLeftChild(nx_fix, nx_off, node_lc_fix, node_lc_off,
+ frames);
+ BSTNodeUtil.setRightChild(nx_fix, nx_off, node_rc_fix, node_rc_off,
+ frames);
+ if (!isNodeNull(par_fix, par_off)) {
+ int parentLength = BSTNodeUtil.getLength(par_fix, par_off,
+ frames, convertBuffer);
+ if (length < parentLength) {
+ BSTNodeUtil.setLeftChild(par_fix, par_off, nx_fix, nx_off,
+ frames);
+ } else {
+ BSTNodeUtil.setRightChild(par_fix, par_off, nx_fix, nx_off,
+ frames);
+ }
+ }
+
+ if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
+ root.set(nx_fix, nx_off);
+ }
+
+ return;
+ }
+
+ fixTreePtrs(fix, off, par_fix, par_off);
+ }
+
+ private void clear(Slot[] s) {
+ s[0].clear();
+ s[1].clear();
+ }
+
+ private boolean isNodeNull(int frameIx, int offset) {
+ return ((frameIx == BSTNodeUtil.INVALID_INDEX)
+ || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
+ }
+
+ private boolean shouldSplit(int slotLength, int reqLength) {
+ return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
+ }
+
+ private void initNewNode(int frameIx, int offset) {
+ BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+ BSTNodeUtil.INVALID_INDEX, frames);
+ }
+
+ private ByteBuffer allocateFrame() {
+ return ctx.allocateFrame();
+ }
+
+ public String _debug_printMemory() {
+ _debug_free_slots = 0;
+ Slot s = new Slot(0, 0);
+ if (s.isNull()) {
+ return "memory:\tNull";
+ }
+
+ if (BSTNodeUtil.isFree(0, 0, frames)) {
+ _debug_free_slots++;
+ }
+
+ String m = "memory:\n" + _debug_printSlot(0, 0) + "\n";
+ int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0,
+ frames, convertBuffer));
+ int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
+ : length + 2 * BSTNodeUtil.HEADER_SIZE);
+ int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX
+ : 1)
+ : 0);
+ if (noff == BSTNodeUtil.INVALID_INDEX
+ && nfix != BSTNodeUtil.INVALID_INDEX) {
+ noff = 0;
+ }
+ s.set(nfix, noff);
+ while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
+ if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
+ _debug_free_slots++;
+ }
+ m += _debug_printSlot(s.getFrameIx(), s.getOffset()) + "\n";
+ length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+ s.getFrameIx(), s.getOffset(), frames, convertBuffer));
+ noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
+ : s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
+ nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s
+ .getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
+ : s.getFrameIx() + 1) : s.getFrameIx());
+ if (noff == BSTNodeUtil.INVALID_INDEX
+ && nfix != BSTNodeUtil.INVALID_INDEX) {
+ noff = 0;
+ }
+ s.set(nfix, noff);
+ }
+ return m + "\nFree Slots:\t" + _debug_free_slots;
+ }
+
+ public String _debug_printTree() {
+ _debug_tree_size = 0;
+ Slot node = new Slot();
+ node.copy(root);
+ if (!node.isNull()) {
+ _debug_tree_size++;
+ return _debug_printSubTree(node) + "\nTree Nodes:\t"
+ + _debug_tree_size;
+ }
+ return "Null";
+ }
+
+ private String _debug_printSubTree(Slot r) {
+ Slot node = new Slot();
+ node.copy(r);
+ int fix = node.getFrameIx();
+ int off = node.getOffset();
+ int lfix = BSTNodeUtil.leftChild_fIx(node, frames, convertBuffer);
+ int loff = BSTNodeUtil.leftChild_offset(node, frames, convertBuffer);
+ int rfix = BSTNodeUtil.rightChild_fIx(node, frames, convertBuffer);
+ int roff = BSTNodeUtil.rightChild_offset(node, frames, convertBuffer);
+ int nfix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
+ int noff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
+ int pfix = BSTNodeUtil.prev_fIx(node, frames, convertBuffer);
+ int poff = BSTNodeUtil.prev_offset(node, frames, convertBuffer);
+
+ String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
+ + BSTNodeUtil.getLength(fix, off, frames, convertBuffer)
+ + ") - " + "(LC: " + _debug_printSlot(lfix, loff) + ") - "
+ + "(RC: " + _debug_printSlot(rfix, roff) + ") - " + "(NX: "
+ + _debug_printSlot(nfix, noff) + ") - " + "(PR: "
+ + _debug_printSlot(pfix, poff) + ") }\n";
+ if (!isNodeNull(lfix, loff)) {
+ _debug_tree_size++;
+ s += _debug_printSubTree(new Slot(lfix, loff)) + "\n";
+ }
+ if (!isNodeNull(rfix, roff)) {
+ _debug_tree_size++;
+ s += _debug_printSubTree(new Slot(rfix, roff)) + "\n";
+ }
+
+ return s;
+ }
+
+ private String _debug_printSlot(int fix, int off) {
+ if (isNodeNull(fix, off)) {
+ return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
+ }
+ int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+ int al = BSTNodeUtil.getActualLength(l);
+ boolean f = BSTNodeUtil.isFree(fix, off, frames);
+ return fix + ", " + off + " (free: " + f + ") (Len: " + l
+ + ") (actual len: " + al + ") ";
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
new file mode 100644
index 0000000..dc1bffa
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -0,0 +1,277 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author pouria
+ *
+ * Implements utility methods, used extensively and repeatedly within
+ * the BSTMemMgr.
+ *
+ * Main functionality includes methods to set/get different types of
+ * pointers, required and accessed within BST traversal, along with the
+ * methods for setting/getting length/header/footer of free slots, which
+ * have been used as the containers for BST nodes.
+ */
+public class BSTNodeUtil {
+
+ static final int TOTAL_FRAME_SIZE = BSTMemMgr.FRAME_SIZE;
+ static final int MINIMUM_FREE_SLOT_SIZE = 32;
+
+ static final int FRAME_PTR_SIZE = 4;
+ static final int OFFSET_SIZE = 2;
+
+ static final int HEADER_SIZE = 2;
+ static final int HEADER_INDEX = 0;
+
+ static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
+ static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX
+ + FRAME_PTR_SIZE;
+
+ static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX
+ + OFFSET_SIZE;
+ static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX
+ + FRAME_PTR_SIZE;
+
+ static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+ static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
+
+ static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
+ static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
+
+ private static final byte INVALID = -128;
+ private static final byte MASK = 127;
+ static final int INVALID_INDEX = -1;
+
+ /*
+ * Structure of a free slot:
+ * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
+ * HEADER is set to 1 (invalid content)
+ *
+ * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
+ * 0 (valid content)
+ */
+
+ static int leftChild_fIx(Slot s, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return leftChild_fIx(s.getFrameIx(), s.getOffset(), frames,
+ convertBuffer);
+ }
+
+ static int leftChild_offset(Slot s, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return leftChild_offset(s.getFrameIx(), s.getOffset(), frames,
+ convertBuffer);
+ }
+
+ static int leftChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX,
+ FRAME_PTR_SIZE, convertBuffer));
+
+ }
+
+ static int leftChild_offset(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx],
+ offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
+
+ static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
+ setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(),
+ lc.getOffset(), frames);
+ }
+
+ static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff,
+ ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX,
+ FRAME_PTR_SIZE, lcFix);
+ storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX,
+ OFFSET_SIZE, lcOff);
+ }
+
+ static int rightChild_fIx(Slot s, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return rightChild_fIx(s.getFrameIx(), s.getOffset(), frames,
+ convertBuffer);
+ }
+
+ static int rightChild_offset(Slot s, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return rightChild_offset(s.getFrameIx(), s.getOffset(), frames,
+ convertBuffer);
+ }
+
+ static int rightChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx],
+ offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+ }
+
+ static int rightChild_offset(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset
+ + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+ }
+
+ static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
+ setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(),
+ rc.getOffset(), frames);
+ }
+
+ static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff,
+ ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX,
+ FRAME_PTR_SIZE, rcFix);
+ storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX,
+ OFFSET_SIZE, rcOff);
+ }
+
+ static int next_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return next_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
+
+ static int next_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return next_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
+
+ static int next_fIx(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX,
+ FRAME_PTR_SIZE, convertBuffer));
+ }
+
+ static int next_offset(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX,
+ OFFSET_SIZE, convertBuffer));
+ }
+
+ static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
+ setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(),
+ node.getOffset(), frames);
+ }
+
+ static void setNext(int nodeFix, int nodeOff, int nFix, int nOff,
+ ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE,
+ nFix);
+ storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE,
+ nOff);
+ }
+
+ static int prev_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return prev_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
+
+ static int prev_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return prev_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
+
+ static int prev_fIx(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX,
+ FRAME_PTR_SIZE, convertBuffer));
+ }
+
+ static int prev_offset(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX,
+ OFFSET_SIZE, convertBuffer));
+ }
+
+ static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
+ setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(),
+ prev.getOffset(), frames);
+ }
+
+ static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff,
+ ByteBuffer[] frames) {
+ storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE,
+ pFix);
+ storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE,
+ pOff);
+ }
+
+ static boolean slotsTheSame(Slot s, Slot t) {
+ return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t
+ .getOffset()));
+ }
+
+ static void setHeaderFooter(int frameIx, int offset, int usedLength,
+ boolean isFree, ByteBuffer[] frames) {
+ int slotLength = getActualLength(usedLength);
+ int footerOffset = offset + HEADER_SIZE + slotLength;
+ storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
+ storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
+ setFree(frameIx, offset, isFree, frames);
+ setFree(frameIx, footerOffset, isFree, frames);
+ }
+
+ static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+ return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+ }
+
+ static int getLength(int frameIx, int offset, ByteBuffer[] frames,
+ ByteBuffer convertBuffer) {
+ convertBuffer.clear();
+ for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
+ convertBuffer.put(i, (byte) 0x00);
+ }
+
+ convertBuffer.put(4 - HEADER_SIZE,
+ (byte) ((frames[frameIx].get(offset)) & (MASK)));
+ System.arraycopy(frames[frameIx].array(), offset + 1,
+ convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
+ return convertBuffer.getInt(0);
+ }
+
+ // MSB equal to 1 means FREE
+ static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
+ return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
+ }
+
+ static void setFree(int frameIx, int offset, boolean free,
+ ByteBuffer[] frames) {
+ if (free) { // set MSB to 1 (for free)
+ frames[frameIx].put(offset,
+ (byte) (((frames[frameIx]).array()[offset]) | 0x80));
+ } else { // set MSB to 0 (for occupied)
+ frames[frameIx].put(offset,
+ (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
+ }
+ }
+
+ static int getActualLength(int l) {
+ int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
+ return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
+ }
+
+ private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size,
+ ByteBuffer convertBuffer) {
+ if ((b.get(fromIndex) & INVALID) == INVALID) {
+ return INVALID_INDEX;
+ }
+
+ convertBuffer.clear();
+ for (int i = 0; i < 4 - size; i++) { // padding
+ convertBuffer.put(i, (byte) 0x00);
+ }
+
+ System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size,
+ size);
+ return convertBuffer.getInt(0);
+ }
+
+ private static void storeInt(ByteBuffer b, int fromIndex, int size,
+ int value) {
+ if (value == INVALID_INDEX) {
+ b.put(fromIndex, INVALID);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ b.put(fromIndex + i,
+ (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
new file mode 100644
index 0000000..1cc15cf
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * @author pouria
+ *
+ * Defines the required operations, needed for any memory manager, used
+ * in sorting with replacement selection, to manage the free spaces
+ */
+
+public interface IMemoryManager {
+
+ /**
+ * Allocates a free slot equal or greater than requested length. Pointer to
+ * the allocated slot is put in result, and gets returned to the caller. If
+ * no proper free slot is available, result would contain the null/invalid
+ * pointer (may vary between different implementations)
+ *
+ * @param length
+ * @param result
+ * @throws HyracksDataException
+ */
+ void allocate(int length, Slot result) throws HyracksDataException;
+
+ /**
+ * Unallocates the specified slot (and returns it back to the free slots
+ * set)
+ *
+ * @param s
+ * @return the total length of unallocted slot
+ * @throws HyracksDataException
+ */
+ int unallocate(Slot s) throws HyracksDataException;
+
+ /**
+ * @param frameIndex
+ * @return the specified frame, from the set of memory buffers, being
+ * managed by this memory manager
+ */
+ ByteBuffer getFrame(int frameIndex);
+
+ /**
+ * Writes the specified tuple into the specified memory slot (denoted by
+ * frameIX and offset)
+ *
+ * @param frameIx
+ * @param offset
+ * @param src
+ * @param tIndex
+ * @return
+ */
+ boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
+ int tIndex);
+
+ /**
+ * Reads the specified tuple (denoted by frameIx and offset) and appends it
+ * to the passed FrameTupleAppender
+ *
+ * @param frameIx
+ * @param offset
+ * @param dest
+ * @return
+ */
+ boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
+
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
new file mode 100644
index 0000000..1d55bc8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+/**
+ * @author pouria
+ *
+ * Inteface for the Run Generator
+ */
+public interface IRunGenerator extends IFrameWriter {
+ /**
+ *
+ * @return the list of generated runs, each stored sorted
+ */
+ public List<IFrameReader> getRuns();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
new file mode 100644
index 0000000..2cc731c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+/**
+ * @author pouria Defines the selection tree, used in sorting with replacement
+ * selection to manage the order of outputing tuples into the runs,
+ * during the run generation phase. At each point of time, this tree
+ * contains tuples, belonging to two different runs: - Current run,
+ * being written to the output - Next run
+ */
+public interface ISelectionTree {
+ /**
+ * Inserts a new element into the selection.
+ *
+ * @param element
+ * contains the pointer to the memory slot, containing the tuple,
+ * along with its run number
+ */
+ void insert(int[] element);
+
+ /**
+ * @return Removes and returns the smallest elemnt in the tree
+ */
+ int[] getMin();
+
+ /**
+ * @return Removes and returns the largest elemnt in the tree
+ */
+ int[] getMax();
+
+ /**
+ * @return True of the selection tree does not have any element, and false
+ * otherwise.
+ */
+ boolean isEmpty();
+
+ /**
+ * Removes all the elements in the tree
+ */
+ void reset();
+
+ /**
+ *
+ * @return Returns, but does NOT remove, the smallest element in the tree
+ */
+ int[] peekMin();
+
+ /**
+ *
+ * @return Returns, but does NOT remove, the largest element in the tree
+ */
+ int[] peekMax();
+
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
new file mode 100644
index 0000000..e617ccb
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * @author pouria
+ *
+ * Operator descriptor for sorting with replacement. Consists of two
+ * phases:
+ *
+ * - Run Generation: Denoted by OptimizedSortActivity below, in which
+ * sort runs get generated from the input data. This phases uses the
+ * Selection Tree and Memory Manager to benefit from the replacement
+ * selection optimization, to create runs which are longer than the
+ * available memory size.
+ *
+ * - Merging: Denoted by MergeActivity below, in which runs (generated
+ * in the previous phase) get merged via a merger. Each run has a single
+ * buffer in memory, and a priority queue is used to select the top
+ * tuple each time and send it to the new run/output
+ */
+public class OptimizedExternalSortOperatorDescriptor extends
+ AbstractOperatorDescriptor {
+
+ private static final int NO_LIMIT = -1;
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int SORT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+
+ private final int[] sortFields;
+ private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int memSize;
+ private final int outputLimit;
+
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+ int framesLimit, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ this(spec, framesLimit, NO_LIMIT, sortFields, null,
+ comparatorFactories, recordDescriptor);
+ }
+
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+ int framesLimit, int outputLimit, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ this(spec, framesLimit, outputLimit, sortFields, null,
+ comparatorFactories, recordDescriptor);
+ }
+
+ public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+ int memSize, int outputLimit, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ this.memSize = memSize;
+ this.outputLimit = outputLimit;
+ this.sortFields = sortFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ if (memSize <= 1) {
+ throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+ }
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(
+ odId, SORT_ACTIVITY_ID));
+ OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(
+ odId, MERGE_ACTIVITY_ID));
+
+ builder.addActivity(osa);
+ builder.addSourceEdge(0, osa, 0);
+
+ builder.addActivity(oma);
+ builder.addTargetEdge(0, oma, 0);
+
+ builder.addBlockingEdge(osa, oma);
+ }
+
+ public static class OptimizedSortTaskState extends AbstractTaskState {
+ private List<IFrameReader> runs;
+
+ public OptimizedSortTaskState() {
+ }
+
+ private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ private class OptimizedSortActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public OptimizedSortActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions) {
+ final IRunGenerator runGen;
+ if (outputLimit == NO_LIMIT) {
+ runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields,
+ firstKeyNormalizerFactory, comparatorFactories,
+ recordDescriptors[0], memSize);
+ } else {
+ runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx,
+ sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0], memSize,
+ outputLimit);
+ }
+
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+
+ runGen.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ runGen.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ OptimizedSortTaskState state = new OptimizedSortTaskState(
+ ctx.getJobletContext().getJobId(), new TaskId(
+ getActivityId(), partition));
+ runGen.close();
+ state.runs = runGen.getRuns();
+ env.setTaskState(state);
+
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ runGen.fail();
+ }
+ };
+ return op;
+ }
+ }
+
+ private class OptimizedMergeActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public OptimizedMergeActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ OptimizedSortTaskState state = (OptimizedSortTaskState) env
+ .getTaskState(new TaskId(new ActivityId(
+ getOperatorId(), SORT_ACTIVITY_ID),
+ partition));
+
+ List<IFrameReader> runs = state.runs;
+
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i]
+ .createBinaryComparator();
+ }
+
+ OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(
+ ctx, outputLimit, runs, sortFields, comparators,
+ recordDescriptors[0], memSize, writer);
+
+ merger.process();
+
+ }
+ };
+ return op;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
new file mode 100644
index 0000000..13ba5c7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -0,0 +1,313 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ *
+ * This class implements the run generator for sorting with replacement
+ * selection, where there is no limit on the output, i.e. the whole data
+ * should be sorted. A SortMinHeap is used as the selectionTree to
+ * decide the order of writing tuples into the runs, while memory
+ * manager is based on a binary search tree to allocate tuples in the
+ * memory.
+ *
+ * The overall process is as follows: - Read the input data frame by
+ * frame. For each tuple T in the current frame: - Try to allocate a
+ * memory slot for writing T along with the attached header/footer (for
+ * memory management purpose) - If T can not be allocated, try to output
+ * as many as tuples, currently resident in memory, so that a free slot,
+ * large enough to hold T, gets created. MinHeap decides about which
+ * tuple should be outputed at each step. Write T into the memory. -
+ * Calculate the runID of T (based on the last output tuple for the
+ * current run). It is either the current run or the next run. Also
+ * calculate Poorman's Normalized Key (PNK) for T, to make comparisons
+ * faster later. - Create an heap element for T, containing its runID,
+ * the slot ptr to its memory location, and its PNK. - Insert the heap
+ * element into the heap. - Upon closing, write all the tuples,
+ * currently resident in memory, into their corresponding run(s). Again
+ * min heap decides about which tuple is the next for output.
+ *
+ * OptimizedSortOperatorDescriptor will merge the generated runs, to
+ * generate the final sorted output opf the data.
+ */
+public class OptimizedExternalSortRunGenerator implements IRunGenerator {
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final List<IFrameReader> runs;
+
+ private ISelectionTree sTree;
+ private IMemoryManager memMgr;
+
+ private final int memSize;
+ private FrameTupleAccessor inputAccessor; // Used to read tuples in
+ // nextFrame()
+ private FrameTupleAppender outputAppender; // Used to write tuple to the
+ // dedicated output buffer
+ private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+ // into run(s)
+ private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+ // record from the output
+ // buffer
+ private int curRunSize;
+ private int lastTupleIx; // Holds index of last output tuple in the
+ // dedicated output buffer
+ private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+ // by the memory manager for the new tuple
+ private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+ // the selectionTree to output
+ private int[] sTreeTop;
+
+ RunFileWriter writer;
+
+ private boolean newRun;
+ private int curRunId;
+
+ public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx,
+ int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int memSize) {
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ nkc = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparatorFactories = comparatorFactories;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ this.runs = new LinkedList<IFrameReader>();
+ this.memSize = memSize;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptor);
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputBuffer = ctx.allocateFrame();
+ outputAppender.reset(outputBuffer, true);
+ lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptor);
+
+ this.memMgr = new BSTMemMgr(ctx, memSize);
+ this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories,
+ recordDescriptor, memMgr);
+ this.allocationPtr = new Slot();
+ this.outputedTuple = new Slot();
+ this.sTreeTop = new int[] { -1, -1, -1, -1 };
+ curRunId = -1;
+ openNewRun();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputAccessor.reset(buffer);
+ byte[] bufferArray = buffer.array();
+ int tupleCount = inputAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; ++i) {
+ allocationPtr.clear();
+ int tLength = inputAccessor.getTupleEndOffset(i)
+ - inputAccessor.getTupleStartOffset(i);
+ memMgr.allocate(tLength, allocationPtr);
+ while (allocationPtr.isNull()) {
+ int unAllocSize = -1;
+ while (unAllocSize < tLength) {
+ unAllocSize = outputRecord();
+ if (unAllocSize < 1) {
+ throw new HyracksDataException(
+ "Unable to allocate space for the new tuple, while there is no more tuple to output");
+ }
+ }
+ memMgr.allocate(tLength, allocationPtr);
+ ((BSTMemMgr) memMgr)._debug_decLookupCount();
+ }
+ memMgr.writeTuple(allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), inputAccessor, i);
+ int runId = getRunId(inputAccessor, i);
+ int pnk = getPNK(inputAccessor, i, bufferArray);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ while (!sTree.isEmpty()) { // Outputting remaining elements in the
+ // selectionTree
+ outputRecord();
+ }
+ if (outputAppender.getTupleCount() > 0) { // Writing out very last
+ // resident records to file
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
+
+ public List<IFrameReader> getRuns() {
+ return runs;
+ }
+
+ private int outputRecord() throws HyracksDataException {
+ outputedTuple.clear();
+ sTreeTop = sTree.getMin();
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException(
+ "Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
+
+ if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
+ // runs
+ openNewRun();
+ }
+
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException(
+ "Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ } else {
+ lastTupleIx++;
+ }
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ curRunSize++;
+ return memMgr.unallocate(outputedTuple);
+
+ }
+
+ private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+ // buffInArray
+ // out
+ // for
+ // better
+ // performance
+ // (not
+ // converting
+ // for
+ // each
+ // and
+ // every
+ // tuple)
+ int sfIdx = sortFields[0];
+ int tStart = fta.getTupleStartOffset(tIx);
+ int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+ int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+ int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+ return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
+ - f0StartRel));
+ }
+
+ private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+ // current
+ // record to
+ // last output
+ // record, it
+ // decided about
+ // current
+ // record's
+ // runId
+ if (newRun) { // Very first record for a new run
+ return curRunId;
+ }
+
+ byte[] lastRecBuff = outputBuffer.array();
+ lastRecordAccessor.reset(outputBuffer);
+ int lastStartOffset = lastRecordAccessor
+ .getTupleStartOffset(lastTupleIx);
+
+ ByteBuffer fr2 = fta.getBuffer();
+ byte[] curRecBuff = fr2.array();
+ int r2StartOffset = fta.getTupleStartOffset(tupIx);
+
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
+ + (fIdx - 1) * 4);
+ int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+ int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
+ + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+ * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
+ l2);
+ if (c != 0) {
+ if (c <= 0) {
+ return curRunId;
+ } else {
+ return (curRunId + 1);
+ }
+ }
+ }
+ return curRunId;
+ }
+
+ private void openNewRun() throws HyracksDataException {
+ if (writer != null) { // There is a prev run, so flush its tuples and
+ // close it first
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
+
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ curRunId++;
+ newRun = true;
+ curRunSize = 0;
+ lastTupleIx = -1;
+ }
+
+ private boolean isEntryValid(int[] entry) {
+ return ((entry[SortMinHeap.RUN_ID_IX] > -1)
+ && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
new file mode 100644
index 0000000..df40577
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -0,0 +1,475 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ *
+ * This class implements the run generator for sorting with replacement
+ * selection, where there is a limit on the output, i.e. we are looking
+ * for top-k tuples (first k smallest tuples w.r.t sorting keys), if
+ * data were sorted, where k is a specific limit, decided in advance.
+ *
+ * A SortMinMaxHeap is used as the selectionTree to decide the order of
+ * writing tuples into the runs, and also to prune tuples (if possible).
+ * Memory manager is based on a binary search tree to allocate tuples.
+ *
+ * The overall process is as follows (Assuming that the limit is K): -
+ * Read the input data frame by frame. For each tuple T in the current
+ * frame: - If currentRun R has reached the limit of K on the size, and
+ * (T > maximum tuple of R), then ignore T. - Otherwise, try to allocate
+ * a memory slot for writing T along with the attached header/footer
+ * (for memory management purpose) - If T can not be allocated, try to
+ * output as many as tuples, currently resident in memory, so that a
+ * free slot, large enough to hold T, gets created. MinMaxHeap decides
+ * about which tuple should be outputed at each step. Write T into the
+ * memory. - Calculate the runID of T (based on the last output tuple
+ * for the current run). It is either the current run or the next run.
+ * Also calculate Poorman's Normalized Key (PNK) for T, to make
+ * comparisons faster later. - Create an heap element for T, containing
+ * its runID, the slot ptr to its memory location, and its PNK. - If
+ * runID is the nextRun, insert the heap element into the heap, and
+ * increment the size of nextRun in the stats. - If runID is the
+ * currentRun: - If currentRun has not hit the limit of k, insert the
+ * element into the heap, and increase currentRun size in the stats. -
+ * (arriving here, currentRun has hit the limit of K, while T is less
+ * than the max). Discard the current max for the current run (by
+ * pop-ing it from the heap and unallocating its memory location) and
+ * insert the heap element into the heap. No need to change the
+ * currentRun size as we are replacing an old element (the old max) with
+ * T.
+ *
+ * - Upon closing, write all the tuples, currently resident in memory,
+ * into their corresponding run(s).
+ *
+ * - Note that upon opening a new Run R, if size of R (based on stats)
+ * is S and (S > K), then (S-K) current maximum tuples of R (which are
+ * resident in memory) get discarded at the beginning. MinMax heap can
+ * be used to find these tuples.
+ */
+public class OptimizedExternalSortRunGeneratorWithLimit implements
+ IRunGenerator {
+
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final List<IFrameReader> runs;
+
+ private ISelectionTree sTree;
+ private IMemoryManager memMgr;
+
+ private final int memSize;
+ private FrameTupleAccessor inputAccessor; // Used to read tuples in
+ // nextFrame()
+ private FrameTupleAppender outputAppender; // Used to write tuple to the
+ // dedicated output buffer
+ private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+ // into run(s)
+ private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+ // record from the output
+ // buffer
+ private FrameTupleAccessor fta2; // Used to read max record
+ private final int outputLimit;
+ private int curRunSize;
+ private int nextRunSize;
+ private int lastTupleIx; // Holds index of last output tuple in the
+ // dedicated output buffer
+ private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+ // by the memory manager for the new tuple
+ private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+ // the selectionTree to output
+ private Slot discard;
+ private int[] sTreeTop;
+ private int[] peek;
+ RunFileWriter writer;
+ private boolean newRun;
+ private int curRunId;
+
+ public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx,
+ int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int memSize, int limit) {
+
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ nkc = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparatorFactories = comparatorFactories;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ this.runs = new LinkedList<IFrameReader>();
+ this.memSize = memSize;
+
+ this.outputLimit = limit;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptor);
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputBuffer = ctx.allocateFrame();
+ outputAppender.reset(outputBuffer, true);
+ lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = new BSTMemMgr(ctx, memSize);
+ this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories,
+ recordDescriptor, memMgr);
+ this.allocationPtr = new Slot();
+ this.outputedTuple = new Slot();
+ this.sTreeTop = new int[] { -1, -1, -1, -1 };
+ this.peek = new int[] { -1, -1, -1, -1 };
+ this.discard = new Slot();
+
+ curRunId = -1;
+ curRunSize = 0;
+ nextRunSize = 0;
+ openNewRun();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inputAccessor.reset(buffer);
+ byte[] bufferArray = buffer.array();
+ int tupleCount = inputAccessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ if (curRunSize >= outputLimit) {
+ peek = sTree.peekMax();
+ if (isEntryValid(peek)
+ && compareRecords(inputAccessor, i,
+ peek[SortMinMaxHeap.FRAME_IX],
+ peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
+ continue;
+ }
+ }
+
+ allocationPtr.clear();
+ int tLength = inputAccessor.getTupleEndOffset(i)
+ - inputAccessor.getTupleStartOffset(i);
+ memMgr.allocate(tLength, allocationPtr);
+ while (allocationPtr.isNull()) {
+ int unAllocSize = -1;
+ while (unAllocSize < tLength) {
+ unAllocSize = outputRecord();
+ if (unAllocSize < 1) {
+ throw new HyracksDataException(
+ "Unable to allocate space for the new tuple, while there is no more tuple to output");
+ }
+ }
+ memMgr.allocate(tLength, allocationPtr);
+ }
+
+ int pnk = getPNK(inputAccessor, i, bufferArray);
+ int runId = getRunId(inputAccessor, i);
+ if (runId != curRunId) { // tuple belongs to the next run
+ memMgr.writeTuple(allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ nextRunSize++;
+ continue;
+ }
+ // belongs to the current run
+ if (curRunSize < outputLimit) {
+ memMgr.writeTuple(allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ curRunSize++;
+ continue;
+ }
+
+ peek = sTree.peekMax();
+ if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
+ peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
+ continue;
+ }
+ // replacing the max
+ peek = sTree.getMax();
+ discard.set(peek[SortMinMaxHeap.FRAME_IX],
+ peek[SortMinMaxHeap.OFFSET_IX]);
+ memMgr.unallocate(discard);
+ memMgr.writeTuple(allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), inputAccessor, i);
+ int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+ allocationPtr.getOffset(), pnk };
+ sTree.insert(entry);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ while (!sTree.isEmpty()) { // Outputting remaining elements in the
+ // selectionTree
+ outputRecordForClose();
+ }
+
+ if (outputAppender.getTupleCount() > 0) { // Writing out very last
+ // resident records to file
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+ writer.close();
+ runs.add(writer.createReader());
+ }
+
+ public List<IFrameReader> getRuns() {
+ return runs;
+ }
+
+ private int outputRecord() throws HyracksDataException {
+ outputedTuple.clear();
+ sTreeTop = sTree.getMin();
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException(
+ "Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
+ // not
+ // append
+ // to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException(
+ "Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ } else {
+ lastTupleIx++;
+ }
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ return memMgr.unallocate(outputedTuple);
+ }
+ // Minimum belongs to the next Run
+ openNewRun();
+ int popCount = curRunSize - outputLimit;
+ int l = 0;
+ int maxFreedSpace = 0;
+ for (int p = 0; p < popCount; p++) {
+ peek = sTree.getMax();
+ if (!isEntryValid(peek)) {
+ throw new HyracksDataException(
+ "Invalid Maximum extracted from MinMaxHeap");
+ }
+ discard.set(peek[SortMinMaxHeap.FRAME_IX],
+ peek[SortMinMaxHeap.OFFSET_IX]);
+ l = memMgr.unallocate(discard);
+ if (l > maxFreedSpace) {
+ maxFreedSpace = l;
+ }
+ curRunSize--;
+ }
+
+ if (maxFreedSpace != 0) {
+ return maxFreedSpace;
+ }
+ // No max discarded (We just flushed out the prev run, so the output
+ // buffer should be clear
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ throw new HyracksDataException(
+ "Can not append to the ouput buffer in sort");
+ }
+ lastTupleIx = 0;
+ outputedTuple.set(tFrameIx, tOffset);
+ newRun = false;
+ return memMgr.unallocate(outputedTuple);
+ }
+
+ private void outputRecordForClose() throws HyracksDataException {
+ sTreeTop = sTree.getMin();
+ if (!isEntryValid(sTreeTop)) {
+ throw new HyracksDataException(
+ "Invalid outputed tuple (Top of the selection tree is invalid)");
+ }
+ int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+ int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+ if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
+ openNewRun();
+ }
+
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+ // append to
+ // the
+ // tupleAppender
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+ throw new HyracksDataException(
+ "Can not append to the ouput buffer in sort");
+ }
+ }
+ }
+
+ private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+ // buffInArray
+ // out
+ // for
+ // better
+ // performance
+ // (not
+ // converting
+ // for
+ // each
+ // and
+ // every
+ // tuple)
+ int sfIdx = sortFields[0];
+ int tStart = fta.getTupleStartOffset(tIx);
+ int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+ int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+ int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+ return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
+ - f0StartRel));
+ }
+
+ private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+ // current
+ // record to
+ // last output
+ // record, it
+ // decided about
+ // current
+ // record's
+ // runId
+ if (newRun) { // Very first record for a new run
+ return curRunId;
+ }
+
+ byte[] lastRecBuff = outputBuffer.array();
+ lastRecordAccessor.reset(outputBuffer);
+ int lastStartOffset = lastRecordAccessor
+ .getTupleStartOffset(lastTupleIx);
+
+ ByteBuffer fr2 = fta.getBuffer();
+ byte[] curRecBuff = fr2.array();
+ int r2StartOffset = fta.getTupleStartOffset(tupIx);
+
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
+ + (fIdx - 1) * 4);
+ int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+ int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
+ + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+ * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
+ l2);
+ if (c != 0) {
+ if (c <= 0) {
+ return curRunId;
+ } else {
+ return (curRunId + 1);
+ }
+ }
+ }
+ return curRunId;
+ }
+
+ // first<sec : -1
+ private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2,
+ int offset2) {
+ ByteBuffer buff1 = fta1.getBuffer();
+ byte[] recBuff1 = buff1.array();
+ int offset1 = fta1.getTupleStartOffset(ix1);
+
+ offset2 += BSTNodeUtil.HEADER_SIZE;
+ ByteBuffer buff2 = memMgr.getFrame(fix2);
+ fta2.reset(buff2);
+ byte[] recBuff2 = buff2.array();
+
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : buff1
+ .getInt(offset1 + (fIdx - 1) * 4);
+ int f1End = buff1.getInt(offset1 + fIdx * 4);
+ int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : buff2
+ .getInt(offset2 + (fIdx - 1) * 4);
+ int f2End = buff2.getInt(offset2 + fIdx * 4);
+ int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
+
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+
+ }
+
+ private void openNewRun() throws HyracksDataException {
+ if (writer != null) { // There is a prev run, so flush its tuples and
+ // close it first
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender.reset(outputBuffer, true);
+ writer.close();
+ runs.add(writer.createReader());
+ }
+
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ curRunId++;
+ newRun = true;
+ curRunSize = nextRunSize;
+ nextRunSize = 0;
+ lastTupleIx = -1;
+ }
+
+ private boolean isEntryValid(int[] entry) {
+ return ((entry[SortMinHeap.RUN_ID_IX] > -1)
+ && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
new file mode 100644
index 0000000..67f3498
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ *
+ * This class defines the code for the merger, which is used to merge
+ * the runs, generated previously through an implementation of
+ * IRunGenerator, for sorting with replacement selection.
+ *
+ * If number of input runs is less than the available memory frames,
+ * then merging can be done in one pass, by allocating one buffer per
+ * run, and one buffer as the output buffer. A priorityQueue is used to
+ * find the top tuple at each iteration, among all the runs' heads in
+ * memory (check RunMergingFrameReader for more details). Otherwise,
+ * assuming that we have R runs and M memory buffers, where (R > M), we
+ * first merge first (M-1) runs and create a new sorted run. Discarding
+ * the first (M-1) runs, now merging step gets applied recursively on
+ * the (R-M+2) remaining runs using the M memory buffers.
+ *
+ * Merging also takes the outputLimit L, if specified, into account
+ * during merging. Once the final pass is done on the runs (which is the
+ * pass that generates the final sorted output), as soon as the output
+ * hits the limit L, the process stops, closes, and returns.
+ */
+
+public class OptimizedExternalSortRunMerger {
+ private final IHyracksTaskContext ctx;
+ private final List<IFrameReader> runs;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final int framesLimit;
+ private final IFrameWriter writer;
+ private List<ByteBuffer> inFrames;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+ private FrameTupleAccessor outFrameAccessor;
+ private final int outputLimit;
+ private int currentSize;
+
+ public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx,
+ int outputLimit, List<IFrameReader> runs, int[] sortFields,
+ IBinaryComparator[] comparators, RecordDescriptor recordDesc,
+ int framesLimit, IFrameWriter writer) {
+ this.ctx = ctx;
+ this.runs = new LinkedList<IFrameReader>(runs);
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ this.outputLimit = outputLimit;
+ this.currentSize = 0;
+ }
+
+ public void process() throws HyracksDataException {
+ writer.open();
+
+ try {
+ outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDesc);
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
+
+ if (runs.size() == 1) {
+
+ if (outputLimit < 1) {
+ runs.get(0).open();
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (runs.get(0).nextFrame(nextFrame)) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ outFrameAppender.reset(nextFrame, true);
+ }
+ return;
+ }
+
+ int totalCount = 0;
+ runs.get(0).open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(
+ ctx.getFrameSize(), recordDesc);
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (totalCount <= outputLimit
+ && runs.get(0).nextFrame(nextFrame)) {
+ fta.reset(nextFrame);
+ int tupCount = fta.getTupleCount();
+ if ((totalCount + tupCount) < outputLimit) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ totalCount += tupCount;
+ continue;
+ }
+ // The very last buffer, which exceeds the limit
+ int copyCount = outputLimit - totalCount;
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < copyCount; i++) {
+ if (!outFrameAppender.append(fta, i)) {
+ throw new IllegalStateException();
+ }
+ totalCount++;
+ }
+ }
+
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+
+ return;
+ }
+
+ // More than one run, actual merging is needed
+ inFrames = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ while (runs.size() > 0) {
+ try {
+ doPass(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ // creates a new run from runs that can fit in memory.
+ private void doPass(List<IFrameReader> runs) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+ if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
+ .getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ runCursors[i] = runs.get(i);
+ }
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx,
+ runCursors, inFrames, sortFields, comparators, recordDesc);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
+ if (outputLimit > 0 && finalPass) {
+ outFrameAccessor.reset(outFrame);
+ int count = outFrameAccessor.getTupleCount();
+ if ((currentSize + count) > outputLimit) {
+ ByteBuffer b = ctx.allocateFrame();
+ FrameTupleAppender partialAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ partialAppender.reset(b, true);
+ int copyCount = outputLimit - currentSize;
+ for (int i = 0; i < copyCount; i++) {
+ partialAppender.append(outFrameAccessor, i);
+ currentSize++;
+ }
+ FrameUtils.makeReadable(b);
+ FrameUtils.flushFrame(b, writer);
+ break;
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ currentSize += count;
+ }
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+ } finally {
+ merger.close();
+ }
+
+ if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+ runs.clear();
+ return;
+ }
+
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
new file mode 100644
index 0000000..85bcba2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+/**
+ * @author pouria Defines a slot in the memory, which can be a free or allocated
+ * slot. Memory is a set of frames, ordered as a list. Each tuple is
+ * stored in a slot, where the location of the slot is denoted by a pair
+ * of integers: - The index of the frame, in the lits of frames in
+ * memory - The starting offset of the slot, within the the specific
+ * frame
+ */
+public class Slot {
+
+ private int frameIx;
+ private int offset;
+
+ public Slot() {
+ this.frameIx = BSTNodeUtil.INVALID_INDEX;
+ this.offset = BSTNodeUtil.INVALID_INDEX;
+ }
+
+ public Slot(int frameIx, int offset) {
+ this.frameIx = frameIx;
+ this.offset = offset;
+ }
+
+ public void set(int frameIx, int offset) {
+ this.frameIx = frameIx;
+ this.offset = offset;
+ }
+
+ public int getFrameIx() {
+ return frameIx;
+ }
+
+ public void setFrameIx(int frameIx) {
+ this.frameIx = frameIx;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public boolean isNull() {
+ return (frameIx == BSTNodeUtil.INVALID_INDEX)
+ || (offset == BSTNodeUtil.INVALID_INDEX);
+ }
+
+ public void clear() {
+ this.frameIx = BSTNodeUtil.INVALID_INDEX;
+ this.offset = BSTNodeUtil.INVALID_INDEX;
+ }
+
+ public void copy(Slot s) {
+ this.frameIx = s.getFrameIx();
+ this.offset = s.getOffset();
+ }
+
+ public String toString() {
+ return "(" + frameIx + ", " + offset + ")";
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
new file mode 100644
index 0000000..d811de8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -0,0 +1,249 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ *
+ * @author pouria Implements a minimum binary heap, used as selection tree, for
+ * sort with replacement. This heap structure can only be used as the
+ * min heap (no access to the max element) Two elements in the heap are
+ * compared based on their run numbers, and sorting key(s):
+ *
+ * Considering two elements A and B:
+ *
+ * if RunNumber(A) > RunNumber(B) then A is larger than B if
+ * RunNumber(A) == RunNumber(B), then A is smaller than B, only if the
+ * value of the sort key(s) in B is greater than A (based on the sort
+ * comparator)
+ *
+ */
+public class SortMinHeap implements ISelectionTree {
+
+ static final int RUN_ID_IX = 0;
+ static final int FRAME_IX = 1;
+ static final int OFFSET_IX = 2;
+ final int PNK_IX = 3;
+
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
+
+ List<int[]> tree;
+ IMemoryManager memMgr;
+
+ public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, IMemoryManager memMgr) {
+ this.sortFields = sortFields;
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ this.comparators[i] = comparatorFactories[i]
+ .createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = memMgr;
+
+ this.tree = new ArrayList<int[]>();
+ }
+
+ /*
+ * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
+ */
+ @Override
+ public int[] getMin() {
+ return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
+ }
+
+ @Override
+ public int[] peekMin() {
+ if (tree.size() == 0) {
+ return (new int[] { -1, -1, -1, -1 });
+ }
+ int[] top = tree.get(0);
+ return new int[] { top[0], top[1], top[2], top[3] };
+ }
+
+ @Override
+ public void insert(int[] e) {
+ tree.add(e);
+ siftUp(tree.size() - 1);
+ }
+
+ @Override
+ public void reset() {
+ this.tree.clear();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return (tree.size() < 1);
+ }
+
+ public int _debugGetSize() {
+ return tree.size();
+ }
+
+ private int[] delete(int nix) {
+ int[] nv = tree.get(nix);
+ int[] last = tree.remove(tree.size() - 1);
+
+ if (tree.size() > 0) {
+ tree.set(nix, last);
+ } else {
+ return nv;
+ }
+
+ int pIx = getParent(nix);
+ if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+ siftUp(nix);
+ } else {
+ siftDown(nix);
+ }
+ return nv;
+ }
+
+ private void siftUp(int nodeIx) {
+ int p = getParent(nodeIx);
+ if (p < 0) {
+ return;
+ }
+ while (p > -1 && (compare(nodeIx, p) < 0)) {
+ swap(p, nodeIx);
+ nodeIx = p;
+ p = getParent(nodeIx);
+ if (p < 0) { // We are at the root
+ return;
+ }
+ }
+ }
+
+ private void siftDown(int nodeIx) {
+ int mix = getMinOfChildren(nodeIx);
+ if (mix < 0) {
+ return;
+ }
+ while (mix > -1 && (compare(mix, nodeIx) < 0)) {
+ swap(mix, nodeIx);
+ nodeIx = mix;
+ mix = getMinOfChildren(nodeIx);
+ if (mix < 0) { // We hit the leaf level
+ return;
+ }
+ }
+ }
+
+ // first < sec : -1
+ private int compare(int nodeSIx1, int nodeSIx2) {
+ int[] n1 = tree.get(nodeSIx1);
+ int[] n2 = tree.get(nodeSIx2);
+ return (compare(n1, n2));
+ }
+
+ // first < sec : -1
+ private int compare(int[] n1, int[] n2) {
+ // Compare Run Numbers
+ if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+ return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+ }
+
+ // Compare Poor man Normalized Keys
+ if (n1[PNK_IX] != n2[PNK_IX]) {
+ return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
+ : 1;
+ }
+
+ return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
+ n1[OFFSET_IX], n2[OFFSET_IX]);
+ }
+
+ private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
+ int r2StartOffset) {
+ byte[] b1 = fr1.array();
+ byte[] b2 = fr2.array();
+ fta1.reset(fr1);
+ fta2.reset(fr2);
+ int headerLen = BSTNodeUtil.HEADER_SIZE;
+ r1StartOffset += headerLen;
+ r2StartOffset += headerLen;
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
+ * 4);
+ int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+ int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+ * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ private int getMinOfChildren(int nix) { // returns index of min child
+ int lix = getLeftChild(nix);
+ if (lix < 0) {
+ return -1;
+ }
+ int rix = getRightChild(nix);
+ if (rix < 0) {
+ return lix;
+ }
+ return ((compare(lix, rix) < 0) ? lix : rix);
+ }
+
+ private void swap(int n1Ix, int n2Ix) {
+ int[] temp = tree.get(n1Ix);
+ tree.set(n1Ix, tree.get(n2Ix));
+ tree.set(n2Ix, temp);
+ }
+
+ private int getLeftChild(int ix) {
+ int lix = 2 * ix + 1;
+ return ((lix < tree.size()) ? lix : -1);
+ }
+
+ private int getRightChild(int ix) {
+ int rix = 2 * ix + 2;
+ return ((rix < tree.size()) ? rix : -1);
+ }
+
+ private int getParent(int ix) {
+ return ((ix - 1) / 2);
+ }
+
+ private ByteBuffer getFrame(int frameIx) {
+ return (memMgr.getFrame(frameIx));
+ }
+
+ @Override
+ public int[] getMax() {
+ System.err.println("getMax() method not implemented for Min Heap");
+ return null;
+ }
+
+ @Override
+ public int[] peekMax() {
+ System.err.println("peekMax() method not implemented for Min Heap");
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
new file mode 100644
index 0000000..ab66245
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -0,0 +1,394 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * @author pouria
+ *
+ * Implements a MinMax binary heap, used as the selection tree, in
+ * sorting with replacement. Check SortMinHeap for details on comparing
+ * elements.
+ */
+public class SortMinMaxHeap implements ISelectionTree {
+ static final int RUN_ID_IX = 0;
+ static final int FRAME_IX = 1;
+ static final int OFFSET_IX = 2;
+ final int PNK_IX = 3;
+
+ static final int NOT_EXIST = -1;
+
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDescriptor;
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
+
+ List<int[]> tree;
+ IMemoryManager memMgr;
+
+ public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, IMemoryManager memMgr) {
+ this.sortFields = sortFields;
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ this.comparators[i] = comparatorFactories[i]
+ .createBinaryComparator();
+ }
+ this.recordDescriptor = recordDesc;
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ this.memMgr = memMgr;
+
+ this.tree = new ArrayList<int[]>();
+ }
+
+ @Override
+ public void insert(int[] element) {
+ tree.add(element);
+ bubbleUp(tree.size() - 1);
+ }
+
+ @Override
+ public int[] getMin() {
+ return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
+ }
+
+ @Override
+ public void reset() {
+ this.tree.clear();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return (tree.size() < 1);
+ }
+
+ @Override
+ public int[] peekMin() {
+ if (tree.size() == 0) {
+ return (new int[] { -1, -1, -1, -1 });
+ }
+ int[] top = tree.get(0);
+ return new int[] { top[0], top[1], top[2], top[3] };
+ }
+
+ @Override
+ public int[] getMax() {
+ if (tree.size() == 1) {
+ return tree.remove(0);
+ }
+ if (tree.size() > 1) {
+ int lc = getLeftChild(0);
+ int rc = getRightChild(0);
+ if (rc == -1) {
+ return delete(lc);
+ }
+ return (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+ }
+ return new int[] { -1, -1, -1, -1 };
+ }
+
+ @Override
+ public int[] peekMax() {
+ if (tree.size() == 1) {
+ int[] t = tree.get(0);
+ return new int[] { t[0], t[1], t[2], t[3] };
+ }
+ if (tree.size() > 1) {
+ int lc = getLeftChild(0);
+ int rc = getRightChild(0);
+ if (rc == -1) {
+ int[] t = tree.get(lc);
+ return new int[] { t[0], t[1], t[2], t[3] };
+ }
+ int[] t = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+ return new int[] { t[0], t[1], t[2], t[3] };
+ }
+ return new int[] { -1, -1, -1, -1 };
+ }
+
+ private int[] delete(int delIx) {
+ int s = tree.size();
+ if (s > 1) {
+ int[] delEntry = tree.get(delIx);
+ int[] last = (tree.remove(s - 1));
+ if (delIx != tree.size()) {
+ tree.set(delIx, last);
+ trickleDown(delIx);
+ }
+ return delEntry;
+ } else if (s == 1) {
+ return tree.remove(0);
+ }
+ return null;
+ }
+
+ private void bubbleUp(int ix) {
+ int p = getParentIx(ix);
+ if (isAtMinLevel(ix)) {
+ if (p != NOT_EXIST && compare(p, ix) < 0) {
+ swap(ix, p);
+ bubbleUpMax(p);
+ } else {
+ bubbleUpMin(ix);
+ }
+ } else { // i is at max level
+ if (p != NOT_EXIST && compare(ix, p) < 0) {
+ swap(ix, p);
+ bubbleUpMin(p);
+ } else {
+ bubbleUpMax(ix);
+ }
+ }
+ }
+
+ private void bubbleUpMax(int ix) {
+ int gp = getGrandParent(ix);
+ if (gp != NOT_EXIST && compare(gp, ix) < 0) {
+ swap(ix, gp);
+ bubbleUpMax(gp);
+ }
+ }
+
+ private void bubbleUpMin(int ix) {
+ int gp = getGrandParent(ix);
+ if (gp != NOT_EXIST && compare(ix, gp) < 0) {
+ swap(ix, gp);
+ bubbleUpMin(gp);
+ }
+ }
+
+ private void trickleDown(int ix) {
+ if (isAtMinLevel(ix)) {
+ trickleDownMin(ix);
+ } else {
+ trickleDownMax(ix);
+ }
+ }
+
+ private void trickleDownMax(int ix) {
+ int maxIx = getMaxOfDescendents(ix);
+ if (maxIx == NOT_EXIST) {
+ return;
+ }
+ if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
+ // children
+ if (compare(ix, maxIx) < 0) {
+ swap(maxIx, ix);
+ int p = getParentIx(maxIx);
+ if (p != NOT_EXIST && compare(maxIx, p) < 0) {
+ swap(maxIx, p);
+ }
+ trickleDownMax(maxIx);
+ }
+ } else { // A children
+ if (compare(ix, maxIx) < 0) {
+ swap(ix, maxIx);
+ }
+ }
+ }
+
+ private void trickleDownMin(int ix) {
+ int minIx = getMinOfDescendents(ix);
+ if (minIx == NOT_EXIST) {
+ return;
+ }
+ if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
+ // children
+ if (compare(minIx, ix) < 0) {
+ swap(minIx, ix);
+ int p = getParentIx(minIx);
+ if (p != NOT_EXIST && compare(p, minIx) < 0) {
+ swap(minIx, p);
+ }
+ trickleDownMin(minIx);
+ }
+ } else { // A children
+ if (compare(minIx, ix) < 0) {
+ swap(ix, minIx);
+ }
+ }
+ }
+
+ // Min among children and grand children
+ private int getMinOfDescendents(int ix) {
+ int lc = getLeftChild(ix);
+ if (lc == NOT_EXIST) {
+ return NOT_EXIST;
+ }
+ int rc = getRightChild(ix);
+ if (rc == NOT_EXIST) {
+ return lc;
+ }
+ int min = (compare(lc, rc) < 0) ? lc : rc;
+ int[] lgc = getLeftGrandChildren(ix);
+ int[] rgc = getRightGrandChildren(ix);
+ for (int k = 0; k < 2; k++) {
+ if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
+ min = lgc[k];
+ }
+ if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
+ min = rgc[k];
+ }
+ }
+ return min;
+ }
+
+ // Max among children and grand children
+ private int getMaxOfDescendents(int ix) {
+ int lc = getLeftChild(ix);
+ if (lc == NOT_EXIST) {
+ return NOT_EXIST;
+ }
+ int rc = getRightChild(ix);
+ if (rc == NOT_EXIST) {
+ return lc;
+ }
+ int max = (compare(lc, rc) < 0) ? rc : lc;
+ int[] lgc = getLeftGrandChildren(ix);
+ int[] rgc = getRightGrandChildren(ix);
+ for (int k = 0; k < 2; k++) {
+ if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
+ max = lgc[k];
+ }
+ if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
+ max = rgc[k];
+ }
+ }
+ return max;
+ }
+
+ private void swap(int n1Ix, int n2Ix) {
+ int[] temp = tree.get(n1Ix);
+ tree.set(n1Ix, tree.get(n2Ix));
+ tree.set(n2Ix, temp);
+ }
+
+ private int getParentIx(int i) {
+ if (i == 0) {
+ return NOT_EXIST;
+ }
+ return (i - 1) / 2;
+ }
+
+ private int getGrandParent(int i) {
+ int p = getParentIx(i);
+ return p != -1 ? getParentIx(p) : NOT_EXIST;
+ }
+
+ private int getLeftChild(int i) {
+ int lc = 2 * i + 1;
+ return lc < tree.size() ? lc : NOT_EXIST;
+ }
+
+ private int[] getLeftGrandChildren(int i) {
+ int lc = getLeftChild(i);
+ return lc != NOT_EXIST ? new int[] { getLeftChild(lc),
+ getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
+ }
+
+ private int getRightChild(int i) {
+ int rc = 2 * i + 2;
+ return rc < tree.size() ? rc : -1;
+ }
+
+ private int[] getRightGrandChildren(int i) {
+ int rc = getRightChild(i);
+ return rc != NOT_EXIST ? new int[] { getLeftChild(rc),
+ getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
+ }
+
+ private boolean isAtMinLevel(int i) {
+ int l = getLevel(i);
+ return l % 2 == 0 ? true : false;
+ }
+
+ private int getLevel(int i) {
+ if (i == 0) {
+ return 0;
+ }
+ int l = (int) Math.floor(Math.log(i) / Math.log(2));
+ if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+ return (l + 1);
+ }
+ return l;
+ }
+
+ private ByteBuffer getFrame(int frameIx) {
+ return (memMgr.getFrame(frameIx));
+ }
+
+ // first < sec : -1
+ private int compare(int nodeSIx1, int nodeSIx2) {
+ int[] n1 = tree.get(nodeSIx1);
+ int[] n2 = tree.get(nodeSIx2);
+ return (compare(n1, n2));
+ }
+
+ // first < sec : -1
+ private int compare(int[] n1, int[] n2) {
+ // Compare Run Numbers
+ if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+ return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+ }
+
+ // Compare Poor man Normalized Keys
+ if (n1[PNK_IX] != n2[PNK_IX]) {
+ return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
+ : 1;
+ }
+
+ return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
+ n1[OFFSET_IX], n2[OFFSET_IX]);
+ }
+
+ private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
+ int r2StartOffset) {
+ byte[] b1 = fr1.array();
+ byte[] b2 = fr2.array();
+ fta1.reset(fr1);
+ fta2.reset(fr2);
+ int headerLen = BSTNodeUtil.HEADER_SIZE;
+ r1StartOffset += headerLen;
+ r2StartOffset += headerLen;
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
+ * 4);
+ int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+ int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+ * 4);
+ int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+ int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public String _debugPrintTree() {
+ String s = "";
+ for (int i = 0; i < tree.size(); i++) {
+ int[] n = tree.get(i);
+ s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", "
+ + n[OFFSET_IX] + "), ";
+ }
+ return s;
+ }
+}
\ No newline at end of file