Classes for the implementation of External Sorting with Replacement Selection. Contains classes for Sorting (with and wout limit) and memory management

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@818 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 7647e50..6573ffa 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -59,7 +59,23 @@
         }
         return false;
     }
-
+    
+    /*
+     * ADDED BY POURIA (for his sort operator)
+     * bytes already has the header (the fields offset)
+     */
+    public boolean append(byte[] bytes, int offset, int length){
+    	 if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+             System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+             tupleDataEndOffset += length;
+             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+             ++tupleCount;
+             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+             return true;
+         }
+         return false;
+    }
+    
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
         int length = tEndOffset - tStartOffset;
         if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
new file mode 100644
index 0000000..28c3f34
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -0,0 +1,797 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * @author pouria Memory Manager based on Binary Search Tree (BST) Free slot
+ *         size is the key for the BST nodes. Each node in BST shows a class of
+ *         free slots, while all the free slots with the exact same length are
+ *         stored as a LinkedList, whose head is a BST node. BST is not stored
+ *         as a separate data structure, but the actual free spaces of the
+ *         memory slots are used to hold BST nodes. Each BST node has the
+ *         logical structure, defined in the BSTNodeUtil class.
+ * 
+ */
+public class BSTMemMgr implements IMemoryManager {
+
+	private final IHyracksCommonContext ctx;
+	public static int FRAME_SIZE;
+
+	private ByteBuffer[] frames;
+	private ByteBuffer convertBuffer;
+	private Slot root;
+	private Slot result; // A reusable object to hold one node returned as
+							// methods result
+	private Slot insertSlot; // A reusable object to hold one node in the insert
+								// process
+	private Slot[] par_res;
+	private int lastFrame;
+
+	private static int _debug_free_slots = 0;
+	private static int _debug_tree_size = 0;
+	private int _debug_total_lookup_steps;
+	private int _debug_total_lookup_counts;
+	private int _debug_depth_counter;
+	private int _debug_max_depth;
+
+	public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+		this.ctx = ctx;
+		FRAME_SIZE = ctx.getFrameSize();
+		convertBuffer = ByteBuffer.allocate(4);
+		frames = new ByteBuffer[memSize];
+		lastFrame = -1;
+		root = new Slot();
+		insertSlot = new Slot();
+		result = new Slot();
+		par_res = new Slot[] { new Slot(), new Slot() };
+		_debug_total_lookup_counts = 0;
+		_debug_total_lookup_steps = 0;
+	}
+
+	/**
+	 * result is the container sent by the caller to hold the results
+	 */
+	@Override
+	public void allocate(int length, Slot result) throws HyracksDataException {
+		_debug_total_lookup_counts++;
+		search(length, par_res);
+		if (par_res[1].isNull()) {
+			addFrame(par_res);
+			if (par_res[1].isNull()) {
+				return;
+			}
+		}
+
+		int sl = BSTNodeUtil.getLength(par_res[1], frames, convertBuffer);
+		int acLen = BSTNodeUtil.getActualLength(length);
+		if (shouldSplit(sl, acLen)) {
+			int[] s = split(par_res[1], par_res[0], acLen);
+			int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames,
+					convertBuffer);
+			insert(s[2], s[3], insertLen); // inserting second half of the split
+											// slot
+			BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
+			result.set(s[0], s[1]);
+			return;
+		}
+		allocate(par_res[1], par_res[0], length, result);
+	}
+
+	@Override
+	public int unallocate(Slot s) throws HyracksDataException {
+		int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
+		int actualLen = BSTNodeUtil.getActualLength(usedLen);
+		int fix = s.getFrameIx();
+		int off = s.getOffset();
+
+		int prevMemSlotFooter_off = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
+				: BSTNodeUtil.INVALID_INDEX);
+		int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
+		int nextMemSlotHeader_off = (t < FRAME_SIZE ? t
+				: BSTNodeUtil.INVALID_INDEX);
+		// Remember: next and prev memory slots have the same frame index as the
+		// unallocating slot
+		if (!isNodeNull(fix, prevMemSlotFooter_off)
+				&& BSTNodeUtil.isFree(fix, prevMemSlotFooter_off, frames)) {
+			int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooter_off,
+					frames, convertBuffer);
+			removeFromList(fix, prevMemSlotFooter_off - leftLength
+					- BSTNodeUtil.HEADER_SIZE);
+			int concatLength = actualLen + leftLength + 2
+					* BSTNodeUtil.HEADER_SIZE;
+			if (!isNodeNull(fix, nextMemSlotHeader_off)
+					&& BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
+				removeFromList(fix, nextMemSlotHeader_off);
+				concatLength += BSTNodeUtil.getLength(fix,
+						nextMemSlotHeader_off, frames, convertBuffer)
+						+ 2
+						* BSTNodeUtil.HEADER_SIZE;
+			}
+			insert(fix, prevMemSlotFooter_off - leftLength
+					- BSTNodeUtil.HEADER_SIZE, concatLength); // newly (merged)
+																// slot starts
+																// at the prev
+																// slot offset
+			return concatLength;
+
+		} else if (!isNodeNull(fix, nextMemSlotHeader_off)
+				&& BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
+			removeFromList(fix, nextMemSlotHeader_off);
+			int concatLength = actualLen
+					+ BSTNodeUtil.getLength(fix, nextMemSlotHeader_off, frames,
+							convertBuffer) + 2 * BSTNodeUtil.HEADER_SIZE;
+			insert(fix, off, concatLength); // newly (merged) slot starts at the
+											// unallocating slot offset
+			return concatLength;
+		}
+		// unallocating slot is not merging with any neighbor
+		insert(fix, off, actualLen);
+		return actualLen;
+	}
+
+	@Override
+	public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
+		int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
+		int length = BSTNodeUtil.getLength(frameIx, offset, frames,
+				convertBuffer);
+		return dest.append(frames[frameIx].array(), offToRead, length);
+	}
+
+	@Override
+	public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
+			int tIndex) {
+		int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
+		int tStartOffset = src.getTupleStartOffset(tIndex);
+		int tEndOffset = src.getTupleEndOffset(tIndex);
+		int tupleLength = tEndOffset - tStartOffset;
+		ByteBuffer srcBuffer = src.getBuffer();
+		System.arraycopy(srcBuffer.array(), tStartOffset,
+				frames[frameIx].array(), offToCopy, tupleLength);
+		return true;
+	}
+
+	@Override
+	public ByteBuffer getFrame(int frameIndex) {
+		return frames[frameIndex];
+	}
+
+	public String _debug_getAvgSearchPath() {
+		double avg = (((double) _debug_total_lookup_steps) / ((double) _debug_total_lookup_counts));
+		return "\nTotal allocation requests:\t" + _debug_total_lookup_counts
+				+ "\nAvg Allocation Path Length:\t" + avg
+				+ "\nMax BST Depth:\t" + _debug_max_depth;
+	}
+
+	public void _debug_decLookupCount() {
+		_debug_total_lookup_counts--;
+	}
+
+	/**
+	 * 
+	 * @param p_r
+	 *            is the container passed by the caller to contain the results
+	 * @throws HyracksDataException
+	 */
+	private void addFrame(Slot[] p_r) throws HyracksDataException {
+		_debug_depth_counter = 0;
+		clear(p_r);
+		if ((lastFrame + 1) >= frames.length) {
+			return;
+		}
+		frames[++lastFrame] = allocateFrame();
+		int l = FRAME_SIZE - 2 * BSTNodeUtil.HEADER_SIZE;
+		BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
+		initNewNode(lastFrame, 0);
+
+		p_r[1].copy(root);
+		if (p_r[1].isNull()) { // root is null
+			root.set(lastFrame, 0);
+			initNewNode(root.getFrameIx(), root.getOffset());
+			p_r[1].copy(root);
+			return;
+		}
+
+		while (!p_r[1].isNull()) {
+			_debug_depth_counter++;
+			if (BSTNodeUtil.getLength(p_r[1], frames, convertBuffer) == l) {
+				append(p_r[1].getFrameIx(), p_r[1].getOffset(), lastFrame, 0);
+				p_r[1].set(lastFrame, 0);
+				if (_debug_depth_counter > _debug_max_depth) {
+					_debug_max_depth = _debug_depth_counter;
+				}
+				return;
+			}
+			if (l < BSTNodeUtil.getLength(p_r[1], frames, convertBuffer)) {
+				if (isNodeNull(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
+						convertBuffer), BSTNodeUtil.leftChild_offset(p_r[1],
+						frames, convertBuffer))) {
+					BSTNodeUtil.setLeftChild(p_r[1].getFrameIx(),
+							p_r[1].getOffset(), lastFrame, 0, frames);
+					p_r[0].copy(p_r[1]);
+					p_r[1].set(lastFrame, 0);
+					if (_debug_depth_counter > _debug_max_depth) {
+						_debug_max_depth = _debug_depth_counter;
+					}
+					return;
+				} else {
+					p_r[0].copy(p_r[1]);
+					p_r[1].set(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
+							convertBuffer), BSTNodeUtil.leftChild_offset(
+							p_r[1], frames, convertBuffer));
+				}
+			} else {
+				if (isNodeNull(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
+						convertBuffer), BSTNodeUtil.rightChild_offset(p_r[1],
+						frames, convertBuffer))) {
+					BSTNodeUtil.setRightChild(p_r[1].getFrameIx(),
+							p_r[1].getOffset(), lastFrame, 0, frames);
+					p_r[0].copy(p_r[1]);
+					p_r[1].set(lastFrame, 0);
+					if (_debug_depth_counter > _debug_max_depth) {
+						_debug_max_depth = _debug_depth_counter;
+					}
+					return;
+				} else {
+					p_r[0].copy(p_r[1]);
+					p_r[1].set(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
+							convertBuffer), BSTNodeUtil.rightChild_offset(
+							p_r[1], frames, convertBuffer));
+				}
+			}
+		}
+		throw new HyracksDataException(
+				"New Frame could not be added to BSTMemMgr");
+	}
+
+	private void insert(int fix, int off, int length)
+			throws HyracksDataException {
+		_debug_depth_counter = 0;
+		BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
+		initNewNode(fix, off);
+
+		if (root.isNull()) {
+			root.set(fix, off);
+			return;
+		}
+
+		insertSlot.clear();
+		insertSlot.copy(root);
+		while (!insertSlot.isNull()) {
+			_debug_depth_counter++;
+			int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames,
+					convertBuffer);
+			if (curSlotLen == length) {
+				append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix,
+						off);
+				if (_debug_depth_counter > _debug_max_depth) {
+					_debug_max_depth = _debug_depth_counter;
+				}
+				return;
+			}
+			if (length < curSlotLen) {
+				int lc_fix = BSTNodeUtil.leftChild_fIx(insertSlot, frames,
+						convertBuffer);
+				int lc_off = BSTNodeUtil.leftChild_offset(insertSlot, frames,
+						convertBuffer);
+				if (isNodeNull(lc_fix, lc_off)) {
+					initNewNode(fix, off);
+					BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(),
+							insertSlot.getOffset(), fix, off, frames);
+					if (_debug_depth_counter > _debug_max_depth) {
+						_debug_max_depth = _debug_depth_counter;
+					}
+					return;
+				} else {
+					insertSlot.set(lc_fix, lc_off);
+				}
+			} else {
+				int rc_fix = BSTNodeUtil.rightChild_fIx(insertSlot, frames,
+						convertBuffer);
+				int rc_off = BSTNodeUtil.rightChild_offset(insertSlot, frames,
+						convertBuffer);
+				if (isNodeNull(rc_fix, rc_off)) {
+					initNewNode(fix, off);
+					BSTNodeUtil.setRightChild(insertSlot.getFrameIx(),
+							insertSlot.getOffset(), fix, off, frames);
+					if (_debug_depth_counter > _debug_max_depth) {
+						_debug_max_depth = _debug_depth_counter;
+					}
+					return;
+				} else {
+					insertSlot.set(rc_fix, rc_off);
+				}
+			}
+		}
+		throw new HyracksDataException(
+				"Failure in node insertion into BST in BSTMemMgr");
+	}
+
+	/**
+	 * @param length
+	 * @param target
+	 *            is the container sent by the caller to hold the results
+	 */
+	private void search(int length, Slot[] target) {
+		clear(target);
+		result.clear();
+
+		if (root.isNull()) {
+			return;
+		}
+
+		Slot lastLeftParent = new Slot();
+		Slot lastLeft = new Slot();
+		Slot parent = new Slot();
+		result.copy(root);
+
+		while (!result.isNull()) {
+			_debug_total_lookup_steps++;
+			if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
+				target[0].copy(parent);
+				target[1].copy(result);
+				return;
+			}
+			if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
+				lastLeftParent.copy(parent);
+				lastLeft.copy(result);
+				parent.copy(result);
+				int fix = BSTNodeUtil.leftChild_fIx(result, frames,
+						convertBuffer);
+				int off = BSTNodeUtil.leftChild_offset(result, frames,
+						convertBuffer);
+				result.set(fix, off);
+			} else {
+				parent.copy(result);
+				int fix = BSTNodeUtil.rightChild_fIx(result, frames,
+						convertBuffer);
+				int off = BSTNodeUtil.rightChild_offset(result, frames,
+						convertBuffer);
+				result.set(fix, off);
+			}
+		}
+
+		target[0].copy(lastLeftParent);
+		target[1].copy(lastLeft);
+
+	}
+
+	private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
+		initNewNode(nodeFix, nodeOff);
+
+		int fix = BSTNodeUtil.next_fIx(headFix, headOff, frames, convertBuffer); // frameIx
+																					// for
+																					// the
+																					// current
+																					// next
+																					// of
+																					// head
+		int off = BSTNodeUtil.next_offset(headFix, headOff, frames,
+				convertBuffer); // offset for the current next of head
+		BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
+
+		if (!isNodeNull(fix, off)) {
+			BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
+		}
+		BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
+		BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
+	}
+
+	private int[] split(Slot listHead, Slot parent, int length) {
+		int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer)
+				- length - 2 * BSTNodeUtil.HEADER_SIZE;
+		// We split the node after slots-list head
+		if (!isNodeNull(BSTNodeUtil.next_fIx(listHead, frames, convertBuffer),
+				BSTNodeUtil.next_offset(listHead, frames, convertBuffer))) {
+			int afterHeadFix = BSTNodeUtil.next_fIx(listHead, frames,
+					convertBuffer);
+			int afterHeadOff = BSTNodeUtil.next_offset(listHead, frames,
+					convertBuffer);
+			int afHNextFix = BSTNodeUtil.next_fIx(afterHeadFix, afterHeadOff,
+					frames, convertBuffer);
+			int afHNextOff = BSTNodeUtil.next_offset(afterHeadFix,
+					afterHeadOff, frames, convertBuffer);
+			BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(),
+					afHNextFix, afHNextOff, frames);
+			if (!isNodeNull(afHNextFix, afHNextOff)) {
+				BSTNodeUtil.setPrev(afHNextFix, afHNextOff,
+						listHead.getFrameIx(), listHead.getOffset(), frames);
+			}
+			int secondOffset = afterHeadOff + length + 2
+					* BSTNodeUtil.HEADER_SIZE;
+			BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length,
+					true, frames);
+			BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true,
+					frames);
+
+			return new int[] { afterHeadFix, afterHeadOff, afterHeadFix,
+					secondOffset };
+		}
+		// We split the head
+		int secondOffset = listHead.getOffset() + length + 2
+				* BSTNodeUtil.HEADER_SIZE;
+		BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(),
+				listHead.getOffset(), length, true, frames);
+		BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2,
+				true, frames);
+
+		fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(),
+				parent.getFrameIx(), parent.getOffset());
+		return new int[] { listHead.getFrameIx(), listHead.getOffset(),
+				listHead.getFrameIx(), secondOffset };
+	}
+
+	private void fixTreePtrs(int node_fix, int node_off, int par_fix,
+			int par_off) {
+		int nlc_fix = BSTNodeUtil.leftChild_fIx(node_fix, node_off, frames,
+				convertBuffer);
+		int nlc_off = BSTNodeUtil.leftChild_offset(node_fix, node_off, frames,
+				convertBuffer);
+		int nrc_fix = BSTNodeUtil.rightChild_fIx(node_fix, node_off, frames,
+				convertBuffer);
+		int nrc_off = BSTNodeUtil.rightChild_offset(node_fix, node_off, frames,
+				convertBuffer);
+
+		int status = -1; // (status==0 if node is left child of parent)
+							// (status==1 if node is right child of parent)
+		if (!isNodeNull(par_fix, par_off)) {
+			int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+					node_fix, node_off, frames, convertBuffer));
+			int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+					par_fix, par_off, frames, convertBuffer));
+			status = ((nlen < plen) ? 0 : 1);
+		}
+
+		if (!isNodeNull(nlc_fix, nlc_off) && !isNodeNull(nrc_fix, nrc_off)) { // Node
+																				// has
+																				// two
+																				// children
+			int pmin_fix = node_fix;
+			int pmin_off = node_off;
+			int min_fix = nrc_fix;
+			int min_off = nrc_off;
+			int next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
+					frames, convertBuffer);
+			int next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
+					frames, convertBuffer);
+
+			while (!isNodeNull(next_left_fix, next_left_off)) {
+				pmin_fix = min_fix;
+				pmin_off = min_off;
+				min_fix = next_left_fix;
+				min_off = next_left_off;
+				next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
+						frames, convertBuffer); // min is now pointing to
+												// current (old) next left
+				next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
+						frames, convertBuffer); // min is now pointing to
+												// current (old) next left
+			}
+
+			if ((nrc_fix == min_fix) && (nrc_off == min_off)) { // nrc is the
+																// same as min
+				BSTNodeUtil.setLeftChild(nrc_fix, nrc_off, nlc_fix, nlc_off,
+						frames);
+			} else { // min is different from nrc
+				int min_right_fix = BSTNodeUtil.rightChild_fIx(min_fix,
+						min_off, frames, convertBuffer);
+				int min_right_off = BSTNodeUtil.rightChild_offset(min_fix,
+						min_off, frames, convertBuffer);
+				BSTNodeUtil.setRightChild(min_fix, min_off, nrc_fix, nrc_off,
+						frames);
+				BSTNodeUtil.setLeftChild(min_fix, min_off, nlc_fix, nlc_off,
+						frames);
+				BSTNodeUtil.setLeftChild(pmin_fix, pmin_off, min_right_fix,
+						min_right_off, frames);
+			}
+
+			// Now dealing with the parent
+			if (!isNodeNull(par_fix, par_off)) {
+				if (status == 0) {
+					BSTNodeUtil.setLeftChild(par_fix, par_off, min_fix,
+							min_off, frames);
+				} else if (status == 1) {
+					BSTNodeUtil.setRightChild(par_fix, par_off, min_fix,
+							min_off, frames);
+				}
+			} else { // No parent (node was the root)
+				root.set(min_fix, min_off);
+			}
+			return;
+		}
+
+		else if (!isNodeNull(nlc_fix, nlc_off)) { // Node has only left child
+			if (status == 0) {
+				BSTNodeUtil.setLeftChild(par_fix, par_off, nlc_fix, nlc_off,
+						frames);
+			} else if (status == 1) {
+				BSTNodeUtil.setRightChild(par_fix, par_off, nlc_fix, nlc_off,
+						frames);
+			} else if (status == -1) { // No parent, so node is root
+				root.set(nlc_fix, nlc_off);
+			}
+			return;
+		}
+
+		else if (!isNodeNull(nrc_fix, nrc_off)) { // Node has only right child
+			if (status == 0) {
+				BSTNodeUtil.setLeftChild(par_fix, par_off, nrc_fix, nrc_off,
+						frames);
+			} else if (status == 1) {
+				BSTNodeUtil.setRightChild(par_fix, par_off, nrc_fix, nrc_off,
+						frames);
+			} else if (status == -1) { // No parent, so node is root
+				root.set(nrc_fix, nrc_off);
+			}
+			return;
+		}
+
+		else { // Node is leaf (no children)
+			if (status == 0) {
+				BSTNodeUtil.setLeftChild(par_fix, par_off,
+						BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
+						frames);
+			} else if (status == 1) {
+				BSTNodeUtil.setRightChild(par_fix, par_off,
+						BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
+						frames);
+			} else { // node was the only node in the tree
+				root.clear();
+			}
+			return;
+		}
+	}
+
+	/**
+	 * Allocation with no splitting but padding
+	 * 
+	 * @param node
+	 * @param parent
+	 * @param result
+	 *            is the container sent by the caller to hold the results
+	 */
+	private void allocate(Slot node, Slot parent, int length, Slot result) {
+		int nextFix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
+		int nextOff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
+		if (!isNodeNull(nextFix, nextOff)) {
+			int nextOfNext_fix = BSTNodeUtil.next_fIx(nextFix, nextOff, frames,
+					convertBuffer);
+			int nextOfNext_off = BSTNodeUtil.next_offset(nextFix, nextOff,
+					frames, convertBuffer);
+			BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(),
+					nextOfNext_fix, nextOfNext_off, frames);
+			if (!isNodeNull(nextOfNext_fix, nextOfNext_off)) {
+				BSTNodeUtil.setPrev(nextOfNext_fix, nextOfNext_off,
+						node.getFrameIx(), node.getOffset(), frames);
+			}
+			BSTNodeUtil
+					.setHeaderFooter(nextFix, nextOff, length, false, frames);
+			result.set(nextFix, nextOff);
+			return;
+		}
+
+		fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(),
+				parent.getOffset());
+		BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(),
+				length, false, frames);
+		result.copy(node);
+	}
+
+	private void removeFromList(int fix, int off) {
+		int nx_fix = BSTNodeUtil.next_fIx(fix, off, frames, convertBuffer);
+		int nx_off = BSTNodeUtil.next_offset(fix, off, frames, convertBuffer);
+		int prev_fix = BSTNodeUtil.prev_fIx(fix, off, frames, convertBuffer);
+		int prev_off = BSTNodeUtil.prev_offset(fix, off, frames, convertBuffer);
+		if (!isNodeNull(prev_fix, prev_off) && !isNodeNull(nx_fix, nx_off)) {
+			BSTNodeUtil.setNext(prev_fix, prev_off, nx_fix, nx_off, frames);
+			BSTNodeUtil.setPrev(nx_fix, nx_off, prev_fix, prev_off, frames);
+			BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX,
+					BSTNodeUtil.INVALID_INDEX, frames);
+			BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
+					BSTNodeUtil.INVALID_INDEX, frames);
+			return;
+		}
+		if (!isNodeNull(prev_fix, prev_off)) {
+			BSTNodeUtil.setNext(prev_fix, prev_off, BSTNodeUtil.INVALID_INDEX,
+					BSTNodeUtil.INVALID_INDEX, frames);
+			BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
+					BSTNodeUtil.INVALID_INDEX, frames);
+			return;
+		}
+
+		// We need to find the parent, so we can fix the tree
+		int par_fix = BSTNodeUtil.INVALID_INDEX;
+		int par_off = BSTNodeUtil.INVALID_INDEX;
+		int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix,
+				off, frames, convertBuffer));
+		fix = root.getFrameIx();
+		off = root.getOffset();
+		int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+		while (length != curLen) {
+			par_fix = fix;
+			par_off = off;
+			if (length < curLen) {
+				fix = BSTNodeUtil.leftChild_fIx(par_fix, par_off, frames,
+						convertBuffer); // par_fix is now the old(current) fix
+				off = BSTNodeUtil.leftChild_offset(par_fix, par_off, frames,
+						convertBuffer); // par_off is now the old(current) off
+			} else {
+				fix = BSTNodeUtil.rightChild_fIx(par_fix, par_off, frames,
+						convertBuffer); // par_fix is now the old(current) fix
+				off = BSTNodeUtil.rightChild_offset(par_fix, par_off, frames,
+						convertBuffer); // par_off is now the old(current) off
+			}
+			curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+		}
+
+		if (!isNodeNull(nx_fix, nx_off)) { // it is head of the list (in the
+											// tree)
+			BSTNodeUtil.setPrev(nx_fix, nx_off, BSTNodeUtil.INVALID_INDEX,
+					BSTNodeUtil.INVALID_INDEX, frames);
+			int node_lc_fix = BSTNodeUtil.leftChild_fIx(fix, off, frames,
+					convertBuffer);
+			int node_lc_off = BSTNodeUtil.leftChild_offset(fix, off, frames,
+					convertBuffer);
+			int node_rc_fix = BSTNodeUtil.rightChild_fIx(fix, off, frames,
+					convertBuffer);
+			int node_rc_off = BSTNodeUtil.rightChild_offset(fix, off, frames,
+					convertBuffer);
+			BSTNodeUtil.setLeftChild(nx_fix, nx_off, node_lc_fix, node_lc_off,
+					frames);
+			BSTNodeUtil.setRightChild(nx_fix, nx_off, node_rc_fix, node_rc_off,
+					frames);
+			if (!isNodeNull(par_fix, par_off)) {
+				int parentLength = BSTNodeUtil.getLength(par_fix, par_off,
+						frames, convertBuffer);
+				if (length < parentLength) {
+					BSTNodeUtil.setLeftChild(par_fix, par_off, nx_fix, nx_off,
+							frames);
+				} else {
+					BSTNodeUtil.setRightChild(par_fix, par_off, nx_fix, nx_off,
+							frames);
+				}
+			}
+
+			if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
+				root.set(nx_fix, nx_off);
+			}
+
+			return;
+		}
+
+		fixTreePtrs(fix, off, par_fix, par_off);
+	}
+
+	private void clear(Slot[] s) {
+		s[0].clear();
+		s[1].clear();
+	}
+
+	private boolean isNodeNull(int frameIx, int offset) {
+		return ((frameIx == BSTNodeUtil.INVALID_INDEX)
+				|| (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
+	}
+
+	private boolean shouldSplit(int slotLength, int reqLength) {
+		return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
+	}
+
+	private void initNewNode(int frameIx, int offset) {
+		BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+				BSTNodeUtil.INVALID_INDEX, frames);
+		BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+				BSTNodeUtil.INVALID_INDEX, frames);
+		BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+				BSTNodeUtil.INVALID_INDEX, frames);
+		BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
+				BSTNodeUtil.INVALID_INDEX, frames);
+	}
+
+	private ByteBuffer allocateFrame() {
+		return ctx.allocateFrame();
+	}
+
+	public String _debug_printMemory() {
+		_debug_free_slots = 0;
+		Slot s = new Slot(0, 0);
+		if (s.isNull()) {
+			return "memory:\tNull";
+		}
+
+		if (BSTNodeUtil.isFree(0, 0, frames)) {
+			_debug_free_slots++;
+		}
+
+		String m = "memory:\n" + _debug_printSlot(0, 0) + "\n";
+		int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0,
+				frames, convertBuffer));
+		int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
+				: length + 2 * BSTNodeUtil.HEADER_SIZE);
+		int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX
+				: 1)
+				: 0);
+		if (noff == BSTNodeUtil.INVALID_INDEX
+				&& nfix != BSTNodeUtil.INVALID_INDEX) {
+			noff = 0;
+		}
+		s.set(nfix, noff);
+		while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
+			if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
+				_debug_free_slots++;
+			}
+			m += _debug_printSlot(s.getFrameIx(), s.getOffset()) + "\n";
+			length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
+					s.getFrameIx(), s.getOffset(), frames, convertBuffer));
+			noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
+					: s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
+			nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s
+					.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
+					: s.getFrameIx() + 1) : s.getFrameIx());
+			if (noff == BSTNodeUtil.INVALID_INDEX
+					&& nfix != BSTNodeUtil.INVALID_INDEX) {
+				noff = 0;
+			}
+			s.set(nfix, noff);
+		}
+		return m + "\nFree Slots:\t" + _debug_free_slots;
+	}
+
+	public String _debug_printTree() {
+		_debug_tree_size = 0;
+		Slot node = new Slot();
+		node.copy(root);
+		if (!node.isNull()) {
+			_debug_tree_size++;
+			return _debug_printSubTree(node) + "\nTree Nodes:\t"
+					+ _debug_tree_size;
+		}
+		return "Null";
+	}
+
+	private String _debug_printSubTree(Slot r) {
+		Slot node = new Slot();
+		node.copy(r);
+		int fix = node.getFrameIx();
+		int off = node.getOffset();
+		int lfix = BSTNodeUtil.leftChild_fIx(node, frames, convertBuffer);
+		int loff = BSTNodeUtil.leftChild_offset(node, frames, convertBuffer);
+		int rfix = BSTNodeUtil.rightChild_fIx(node, frames, convertBuffer);
+		int roff = BSTNodeUtil.rightChild_offset(node, frames, convertBuffer);
+		int nfix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
+		int noff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
+		int pfix = BSTNodeUtil.prev_fIx(node, frames, convertBuffer);
+		int poff = BSTNodeUtil.prev_offset(node, frames, convertBuffer);
+
+		String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
+				+ BSTNodeUtil.getLength(fix, off, frames, convertBuffer)
+				+ ") - " + "(LC: " + _debug_printSlot(lfix, loff) + ") - "
+				+ "(RC: " + _debug_printSlot(rfix, roff) + ") - " + "(NX: "
+				+ _debug_printSlot(nfix, noff) + ") - " + "(PR: "
+				+ _debug_printSlot(pfix, poff) + ")  }\n";
+		if (!isNodeNull(lfix, loff)) {
+			_debug_tree_size++;
+			s += _debug_printSubTree(new Slot(lfix, loff)) + "\n";
+		}
+		if (!isNodeNull(rfix, roff)) {
+			_debug_tree_size++;
+			s += _debug_printSubTree(new Slot(rfix, roff)) + "\n";
+		}
+
+		return s;
+	}
+
+	private String _debug_printSlot(int fix, int off) {
+		if (isNodeNull(fix, off)) {
+			return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
+		}
+		int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+		int al = BSTNodeUtil.getActualLength(l);
+		boolean f = BSTNodeUtil.isFree(fix, off, frames);
+		return fix + ", " + off + " (free: " + f + ") (Len: " + l
+				+ ") (actual len: " + al + ") ";
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
new file mode 100644
index 0000000..dc1bffa
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -0,0 +1,277 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author pouria
+ * 
+ *         Implements utility methods, used extensively and repeatedly within
+ *         the BSTMemMgr.
+ * 
+ *         Main functionality includes methods to set/get different types of
+ *         pointers, required and accessed within BST traversal, along with the
+ *         methods for setting/getting length/header/footer of free slots, which
+ *         have been used as the containers for BST nodes.
+ */
+public class BSTNodeUtil {
+
+	static final int TOTAL_FRAME_SIZE = BSTMemMgr.FRAME_SIZE;
+	static final int MINIMUM_FREE_SLOT_SIZE = 32;
+
+	static final int FRAME_PTR_SIZE = 4;
+	static final int OFFSET_SIZE = 2;
+
+	static final int HEADER_SIZE = 2;
+	static final int HEADER_INDEX = 0;
+
+	static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
+	static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX
+			+ FRAME_PTR_SIZE;
+
+	static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX
+			+ OFFSET_SIZE;
+	static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX
+			+ FRAME_PTR_SIZE;
+
+	static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+	static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
+
+	static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
+	static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
+
+	private static final byte INVALID = -128;
+	private static final byte MASK = 127;
+	static final int INVALID_INDEX = -1;
+
+	/*
+	 * Structure of a free slot:
+	 * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
+	 * HEADER is set to 1 (invalid content)
+	 * 
+	 * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
+	 * 0 (valid content)
+	 */
+
+	static int leftChild_fIx(Slot s, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return leftChild_fIx(s.getFrameIx(), s.getOffset(), frames,
+				convertBuffer);
+	}
+
+	static int leftChild_offset(Slot s, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return leftChild_offset(s.getFrameIx(), s.getOffset(), frames,
+				convertBuffer);
+	}
+
+	static int leftChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX,
+				FRAME_PTR_SIZE, convertBuffer));
+
+	}
+
+	static int leftChild_offset(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx],
+				offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+	}
+
+	static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
+		setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(),
+				lc.getOffset(), frames);
+	}
+
+	static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff,
+			ByteBuffer[] frames) {
+		storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX,
+				FRAME_PTR_SIZE, lcFix);
+		storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX,
+				OFFSET_SIZE, lcOff);
+	}
+
+	static int rightChild_fIx(Slot s, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return rightChild_fIx(s.getFrameIx(), s.getOffset(), frames,
+				convertBuffer);
+	}
+
+	static int rightChild_offset(Slot s, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return rightChild_offset(s.getFrameIx(), s.getOffset(), frames,
+				convertBuffer);
+	}
+
+	static int rightChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx],
+				offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+	}
+
+	static int rightChild_offset(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset
+				+ RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+	}
+
+	static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
+		setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(),
+				rc.getOffset(), frames);
+	}
+
+	static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff,
+			ByteBuffer[] frames) {
+		storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX,
+				FRAME_PTR_SIZE, rcFix);
+		storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX,
+				OFFSET_SIZE, rcOff);
+	}
+
+	static int next_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+		return next_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+	}
+
+	static int next_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+		return next_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+	}
+
+	static int next_fIx(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX,
+				FRAME_PTR_SIZE, convertBuffer));
+	}
+
+	static int next_offset(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX,
+				OFFSET_SIZE, convertBuffer));
+	}
+
+	static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
+		setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(),
+				node.getOffset(), frames);
+	}
+
+	static void setNext(int nodeFix, int nodeOff, int nFix, int nOff,
+			ByteBuffer[] frames) {
+		storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE,
+				nFix);
+		storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE,
+				nOff);
+	}
+
+	static int prev_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+		return prev_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+	}
+
+	static int prev_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+		return prev_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+	}
+
+	static int prev_fIx(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX,
+				FRAME_PTR_SIZE, convertBuffer));
+	}
+
+	static int prev_offset(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX,
+				OFFSET_SIZE, convertBuffer));
+	}
+
+	static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
+		setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(),
+				prev.getOffset(), frames);
+	}
+
+	static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff,
+			ByteBuffer[] frames) {
+		storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE,
+				pFix);
+		storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE,
+				pOff);
+	}
+
+	static boolean slotsTheSame(Slot s, Slot t) {
+		return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t
+				.getOffset()));
+	}
+
+	static void setHeaderFooter(int frameIx, int offset, int usedLength,
+			boolean isFree, ByteBuffer[] frames) {
+		int slotLength = getActualLength(usedLength);
+		int footerOffset = offset + HEADER_SIZE + slotLength;
+		storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
+		storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
+		setFree(frameIx, offset, isFree, frames);
+		setFree(frameIx, footerOffset, isFree, frames);
+	}
+
+	static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+		return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+	}
+
+	static int getLength(int frameIx, int offset, ByteBuffer[] frames,
+			ByteBuffer convertBuffer) {
+		convertBuffer.clear();
+		for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
+			convertBuffer.put(i, (byte) 0x00);
+		}
+
+		convertBuffer.put(4 - HEADER_SIZE,
+				(byte) ((frames[frameIx].get(offset)) & (MASK)));
+		System.arraycopy(frames[frameIx].array(), offset + 1,
+				convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
+		return convertBuffer.getInt(0);
+	}
+
+	// MSB equal to 1 means FREE
+	static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
+		return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
+	}
+
+	static void setFree(int frameIx, int offset, boolean free,
+			ByteBuffer[] frames) {
+		if (free) { // set MSB to 1 (for free)
+			frames[frameIx].put(offset,
+					(byte) (((frames[frameIx]).array()[offset]) | 0x80));
+		} else { // set MSB to 0 (for occupied)
+			frames[frameIx].put(offset,
+					(byte) (((frames[frameIx]).array()[offset]) & 0x7F));
+		}
+	}
+
+	static int getActualLength(int l) {
+		int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
+		return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
+	}
+
+	private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size,
+			ByteBuffer convertBuffer) {
+		if ((b.get(fromIndex) & INVALID) == INVALID) {
+			return INVALID_INDEX;
+		}
+
+		convertBuffer.clear();
+		for (int i = 0; i < 4 - size; i++) { // padding
+			convertBuffer.put(i, (byte) 0x00);
+		}
+
+		System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size,
+				size);
+		return convertBuffer.getInt(0);
+	}
+
+	private static void storeInt(ByteBuffer b, int fromIndex, int size,
+			int value) {
+		if (value == INVALID_INDEX) {
+			b.put(fromIndex, INVALID);
+			return;
+		}
+		for (int i = 0; i < size; i++) {
+			b.put(fromIndex + i,
+					(byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
+		}
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
new file mode 100644
index 0000000..1cc15cf
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * @author pouria
+ * 
+ *         Defines the required operations, needed for any memory manager, used
+ *         in sorting with replacement selection, to manage the free spaces
+ */
+
+public interface IMemoryManager {
+
+	/**
+	 * Allocates a free slot equal or greater than requested length. Pointer to
+	 * the allocated slot is put in result, and gets returned to the caller. If
+	 * no proper free slot is available, result would contain the null/invalid
+	 * pointer (may vary between different implementations)
+	 * 
+	 * @param length
+	 * @param result
+	 * @throws HyracksDataException
+	 */
+	void allocate(int length, Slot result) throws HyracksDataException;
+
+	/**
+	 * Unallocates the specified slot (and returns it back to the free slots
+	 * set)
+	 * 
+	 * @param s
+	 * @return the total length of unallocted slot
+	 * @throws HyracksDataException
+	 */
+	int unallocate(Slot s) throws HyracksDataException;
+
+	/**
+	 * @param frameIndex
+	 * @return the specified frame, from the set of memory buffers, being
+	 *         managed by this memory manager
+	 */
+	ByteBuffer getFrame(int frameIndex);
+
+	/**
+	 * Writes the specified tuple into the specified memory slot (denoted by
+	 * frameIX and offset)
+	 * 
+	 * @param frameIx
+	 * @param offset
+	 * @param src
+	 * @param tIndex
+	 * @return
+	 */
+	boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
+			int tIndex);
+
+	/**
+	 * Reads the specified tuple (denoted by frameIx and offset) and appends it
+	 * to the passed FrameTupleAppender
+	 * 
+	 * @param frameIx
+	 * @param offset
+	 * @param dest
+	 * @return
+	 */
+	boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
+
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
new file mode 100644
index 0000000..1d55bc8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+/**
+ * @author pouria
+ * 
+ *         Inteface for the Run Generator
+ */
+public interface IRunGenerator extends IFrameWriter {
+	/**
+	 * 
+	 * @return the list of generated runs, each stored sorted
+	 */
+	public List<IFrameReader> getRuns();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
new file mode 100644
index 0000000..2cc731c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+/**
+ * @author pouria Defines the selection tree, used in sorting with replacement
+ *         selection to manage the order of outputing tuples into the runs,
+ *         during the run generation phase. At each point of time, this tree
+ *         contains tuples, belonging to two different runs: - Current run,
+ *         being written to the output - Next run
+ */
+public interface ISelectionTree {
+	/**
+	 * Inserts a new element into the selection.
+	 * 
+	 * @param element
+	 *            contains the pointer to the memory slot, containing the tuple,
+	 *            along with its run number
+	 */
+	void insert(int[] element);
+
+	/**
+	 * @return Removes and returns the smallest elemnt in the tree
+	 */
+	int[] getMin();
+
+	/**
+	 * @return Removes and returns the largest elemnt in the tree
+	 */
+	int[] getMax();
+
+	/**
+	 * @return True of the selection tree does not have any element, and false
+	 *         otherwise.
+	 */
+	boolean isEmpty();
+
+	/**
+	 * Removes all the elements in the tree
+	 */
+	void reset();
+
+	/**
+	 * 
+	 * @return Returns, but does NOT remove, the smallest element in the tree
+	 */
+	int[] peekMin();
+
+	/**
+	 * 
+	 * @return Returns, but does NOT remove, the largest element in the tree
+	 */
+	int[] peekMax();
+
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
new file mode 100644
index 0000000..e617ccb
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * @author pouria
+ * 
+ *         Operator descriptor for sorting with replacement. Consists of two
+ *         phases:
+ * 
+ *         - Run Generation: Denoted by OptimizedSortActivity below, in which
+ *         sort runs get generated from the input data. This phases uses the
+ *         Selection Tree and Memory Manager to benefit from the replacement
+ *         selection optimization, to create runs which are longer than the
+ *         available memory size.
+ * 
+ *         - Merging: Denoted by MergeActivity below, in which runs (generated
+ *         in the previous phase) get merged via a merger. Each run has a single
+ *         buffer in memory, and a priority queue is used to select the top
+ *         tuple each time and send it to the new run/output
+ */
+public class OptimizedExternalSortOperatorDescriptor extends
+		AbstractOperatorDescriptor {
+
+	private static final int NO_LIMIT = -1;
+
+	private static final long serialVersionUID = 1L;
+
+	private static final int SORT_ACTIVITY_ID = 0;
+	private static final int MERGE_ACTIVITY_ID = 1;
+
+	private final int[] sortFields;
+	private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+	private final IBinaryComparatorFactory[] comparatorFactories;
+	private final int memSize;
+	private final int outputLimit;
+
+	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+			int framesLimit, int[] sortFields,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDescriptor) {
+		this(spec, framesLimit, NO_LIMIT, sortFields, null,
+				comparatorFactories, recordDescriptor);
+	}
+
+	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+			int framesLimit, int outputLimit, int[] sortFields,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDescriptor) {
+		this(spec, framesLimit, outputLimit, sortFields, null,
+				comparatorFactories, recordDescriptor);
+	}
+
+	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
+			int memSize, int outputLimit, int[] sortFields,
+			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDescriptor) {
+		super(spec, 1, 1);
+		this.memSize = memSize;
+		this.outputLimit = outputLimit;
+		this.sortFields = sortFields;
+		this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+		this.comparatorFactories = comparatorFactories;
+		if (memSize <= 1) {
+			throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+		}
+		recordDescriptors[0] = recordDescriptor;
+	}
+
+	@Override
+	public void contributeActivities(IActivityGraphBuilder builder) {
+		OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(
+				odId, SORT_ACTIVITY_ID));
+		OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(
+				odId, MERGE_ACTIVITY_ID));
+
+		builder.addActivity(osa);
+		builder.addSourceEdge(0, osa, 0);
+
+		builder.addActivity(oma);
+		builder.addTargetEdge(0, oma, 0);
+
+		builder.addBlockingEdge(osa, oma);
+	}
+
+	public static class OptimizedSortTaskState extends AbstractTaskState {
+		private List<IFrameReader> runs;
+
+		public OptimizedSortTaskState() {
+		}
+
+		private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
+			super(jobId, taskId);
+		}
+
+		@Override
+		public void toBytes(DataOutput out) throws IOException {
+
+		}
+
+		@Override
+		public void fromBytes(DataInput in) throws IOException {
+
+		}
+	}
+
+	private class OptimizedSortActivity extends AbstractActivityNode {
+		private static final long serialVersionUID = 1L;
+
+		public OptimizedSortActivity(ActivityId id) {
+			super(id);
+		}
+
+		@Override
+		public IOperatorNodePushable createPushRuntime(
+				final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+				IRecordDescriptorProvider recordDescProvider,
+				final int partition, int nPartitions) {
+			final IRunGenerator runGen;
+			if (outputLimit == NO_LIMIT) {
+				runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields,
+						firstKeyNormalizerFactory, comparatorFactories,
+						recordDescriptors[0], memSize);
+			} else {
+				runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx,
+						sortFields, firstKeyNormalizerFactory,
+						comparatorFactories, recordDescriptors[0], memSize,
+						outputLimit);
+			}
+
+			IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+				@Override
+				public void open() throws HyracksDataException {
+
+					runGen.open();
+				}
+
+				@Override
+				public void nextFrame(ByteBuffer buffer)
+						throws HyracksDataException {
+					runGen.nextFrame(buffer);
+				}
+
+				@Override
+				public void close() throws HyracksDataException {
+					OptimizedSortTaskState state = new OptimizedSortTaskState(
+							ctx.getJobletContext().getJobId(), new TaskId(
+									getActivityId(), partition));
+					runGen.close();
+					state.runs = runGen.getRuns();
+					env.setTaskState(state);
+
+				}
+
+				@Override
+				public void fail() throws HyracksDataException {
+					runGen.fail();
+				}
+			};
+			return op;
+		}
+	}
+
+	private class OptimizedMergeActivity extends AbstractActivityNode {
+		private static final long serialVersionUID = 1L;
+
+		public OptimizedMergeActivity(ActivityId id) {
+			super(id);
+		}
+
+		@Override
+		public IOperatorNodePushable createPushRuntime(
+				final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+				IRecordDescriptorProvider recordDescProvider,
+				final int partition, int nPartitions) {
+			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+				@Override
+				public void initialize() throws HyracksDataException {
+					OptimizedSortTaskState state = (OptimizedSortTaskState) env
+							.getTaskState(new TaskId(new ActivityId(
+									getOperatorId(), SORT_ACTIVITY_ID),
+									partition));
+
+					List<IFrameReader> runs = state.runs;
+
+					IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+					for (int i = 0; i < comparatorFactories.length; ++i) {
+						comparators[i] = comparatorFactories[i]
+								.createBinaryComparator();
+					}
+
+					OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(
+							ctx, outputLimit, runs, sortFields, comparators,
+							recordDescriptors[0], memSize, writer);
+
+					merger.process();
+
+				}
+			};
+			return op;
+		}
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
new file mode 100644
index 0000000..13ba5c7
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -0,0 +1,313 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ * 
+ *         This class implements the run generator for sorting with replacement
+ *         selection, where there is no limit on the output, i.e. the whole data
+ *         should be sorted. A SortMinHeap is used as the selectionTree to
+ *         decide the order of writing tuples into the runs, while memory
+ *         manager is based on a binary search tree to allocate tuples in the
+ *         memory.
+ * 
+ *         The overall process is as follows: - Read the input data frame by
+ *         frame. For each tuple T in the current frame: - Try to allocate a
+ *         memory slot for writing T along with the attached header/footer (for
+ *         memory management purpose) - If T can not be allocated, try to output
+ *         as many as tuples, currently resident in memory, so that a free slot,
+ *         large enough to hold T, gets created. MinHeap decides about which
+ *         tuple should be outputed at each step. Write T into the memory. -
+ *         Calculate the runID of T (based on the last output tuple for the
+ *         current run). It is either the current run or the next run. Also
+ *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
+ *         faster later. - Create an heap element for T, containing its runID,
+ *         the slot ptr to its memory location, and its PNK. - Insert the heap
+ *         element into the heap. - Upon closing, write all the tuples,
+ *         currently resident in memory, into their corresponding run(s). Again
+ *         min heap decides about which tuple is the next for output.
+ * 
+ *         OptimizedSortOperatorDescriptor will merge the generated runs, to
+ *         generate the final sorted output opf the data.
+ */
+public class OptimizedExternalSortRunGenerator implements IRunGenerator {
+	private final IHyracksTaskContext ctx;
+	private final int[] sortFields;
+	private final INormalizedKeyComputer nkc;
+	IBinaryComparatorFactory[] comparatorFactories;
+	private final IBinaryComparator[] comparators;
+	private final RecordDescriptor recordDescriptor;
+	private final List<IFrameReader> runs;
+
+	private ISelectionTree sTree;
+	private IMemoryManager memMgr;
+
+	private final int memSize;
+	private FrameTupleAccessor inputAccessor; // Used to read tuples in
+												// nextFrame()
+	private FrameTupleAppender outputAppender; // Used to write tuple to the
+												// dedicated output buffer
+	private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+										// into run(s)
+	private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+													// record from the output
+													// buffer
+	private int curRunSize;
+	private int lastTupleIx; // Holds index of last output tuple in the
+								// dedicated output buffer
+	private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+								// by the memory manager for the new tuple
+	private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+								// the selectionTree to output
+	private int[] sTreeTop;
+
+	RunFileWriter writer;
+
+	private boolean newRun;
+	private int curRunId;
+
+	public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx,
+			int[] sortFields,
+			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDesc, int memSize) {
+		this.ctx = ctx;
+		this.sortFields = sortFields;
+		nkc = firstKeyNormalizerFactory == null ? null
+				: firstKeyNormalizerFactory.createNormalizedKeyComputer();
+		this.comparatorFactories = comparatorFactories;
+		comparators = new IBinaryComparator[comparatorFactories.length];
+		for (int i = 0; i < comparatorFactories.length; ++i) {
+			comparators[i] = comparatorFactories[i].createBinaryComparator();
+		}
+		this.recordDescriptor = recordDesc;
+		this.runs = new LinkedList<IFrameReader>();
+		this.memSize = memSize;
+	}
+
+	@Override
+	public void open() throws HyracksDataException {
+		runs.clear();
+		inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+				recordDescriptor);
+		outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+		outputBuffer = ctx.allocateFrame();
+		outputAppender.reset(outputBuffer, true);
+		lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+				recordDescriptor);
+
+		this.memMgr = new BSTMemMgr(ctx, memSize);
+		this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories,
+				recordDescriptor, memMgr);
+		this.allocationPtr = new Slot();
+		this.outputedTuple = new Slot();
+		this.sTreeTop = new int[] { -1, -1, -1, -1 };
+		curRunId = -1;
+		openNewRun();
+	}
+
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+		inputAccessor.reset(buffer);
+		byte[] bufferArray = buffer.array();
+		int tupleCount = inputAccessor.getTupleCount();
+		for (int i = 0; i < tupleCount; ++i) {
+			allocationPtr.clear();
+			int tLength = inputAccessor.getTupleEndOffset(i)
+					- inputAccessor.getTupleStartOffset(i);
+			memMgr.allocate(tLength, allocationPtr);
+			while (allocationPtr.isNull()) {
+				int unAllocSize = -1;
+				while (unAllocSize < tLength) {
+					unAllocSize = outputRecord();
+					if (unAllocSize < 1) {
+						throw new HyracksDataException(
+								"Unable to allocate space for the new tuple, while there is no more tuple to output");
+					}
+				}
+				memMgr.allocate(tLength, allocationPtr);
+				((BSTMemMgr) memMgr)._debug_decLookupCount();
+			}
+			memMgr.writeTuple(allocationPtr.getFrameIx(),
+					allocationPtr.getOffset(), inputAccessor, i);
+			int runId = getRunId(inputAccessor, i);
+			int pnk = getPNK(inputAccessor, i, bufferArray);
+			int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+					allocationPtr.getOffset(), pnk };
+			sTree.insert(entry);
+		}
+	}
+
+	@Override
+	public void fail() throws HyracksDataException {
+	}
+
+	@Override
+	public void close() throws HyracksDataException {
+		while (!sTree.isEmpty()) { // Outputting remaining elements in the
+									// selectionTree
+			outputRecord();
+		}
+		if (outputAppender.getTupleCount() > 0) { // Writing out very last
+													// resident records to file
+			FrameUtils.flushFrame(outputBuffer, writer);
+		}
+		outputAppender.reset(outputBuffer, true);
+		writer.close();
+		runs.add(writer.createReader());
+	}
+
+	public List<IFrameReader> getRuns() {
+		return runs;
+	}
+
+	private int outputRecord() throws HyracksDataException {
+		outputedTuple.clear();
+		sTreeTop = sTree.getMin();
+		if (!isEntryValid(sTreeTop)) {
+			throw new HyracksDataException(
+					"Invalid outputed tuple (Top of the selection tree is invalid)");
+		}
+
+		if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
+															// runs
+			openNewRun();
+		}
+
+		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+																	// append to
+																	// the
+																	// tupleAppender
+			FrameUtils.flushFrame(outputBuffer, writer);
+			outputAppender.reset(outputBuffer, true);
+			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+				throw new HyracksDataException(
+						"Can not append to the ouput buffer in sort");
+			}
+			lastTupleIx = 0;
+		} else {
+			lastTupleIx++;
+		}
+		outputedTuple.set(tFrameIx, tOffset);
+		newRun = false;
+		curRunSize++;
+		return memMgr.unallocate(outputedTuple);
+
+	}
+
+	private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+																				// buffInArray
+																				// out
+																				// for
+																				// better
+																				// performance
+																				// (not
+																				// converting
+																				// for
+																				// each
+																				// and
+																				// every
+																				// tuple)
+		int sfIdx = sortFields[0];
+		int tStart = fta.getTupleStartOffset(tIx);
+		int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+		int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+		int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+		return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
+				- f0StartRel));
+	}
+
+	private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+																// current
+																// record to
+																// last output
+																// record, it
+																// decided about
+																// current
+																// record's
+																// runId
+		if (newRun) { // Very first record for a new run
+			return curRunId;
+		}
+
+		byte[] lastRecBuff = outputBuffer.array();
+		lastRecordAccessor.reset(outputBuffer);
+		int lastStartOffset = lastRecordAccessor
+				.getTupleStartOffset(lastTupleIx);
+
+		ByteBuffer fr2 = fta.getBuffer();
+		byte[] curRecBuff = fr2.array();
+		int r2StartOffset = fta.getTupleStartOffset(tupIx);
+
+		for (int f = 0; f < comparators.length; ++f) {
+			int fIdx = sortFields[f];
+			int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
+					+ (fIdx - 1) * 4);
+			int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+			int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
+					+ f1Start;
+			int l1 = f1End - f1Start;
+			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+					* 4);
+			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+			int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+			int l2 = f2End - f2Start;
+			int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
+					l2);
+			if (c != 0) {
+				if (c <= 0) {
+					return curRunId;
+				} else {
+					return (curRunId + 1);
+				}
+			}
+		}
+		return curRunId;
+	}
+
+	private void openNewRun() throws HyracksDataException {
+		if (writer != null) { // There is a prev run, so flush its tuples and
+								// close it first
+			if (outputAppender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(outputBuffer, writer);
+			}
+			outputAppender.reset(outputBuffer, true);
+			writer.close();
+			runs.add(writer.createReader());
+		}
+
+		FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+				ExternalSortRunGenerator.class.getSimpleName());
+		writer = new RunFileWriter(file, ctx.getIOManager());
+		writer.open();
+		curRunId++;
+		newRun = true;
+		curRunSize = 0;
+		lastTupleIx = -1;
+	}
+
+	private boolean isEntryValid(int[] entry) {
+		return ((entry[SortMinHeap.RUN_ID_IX] > -1)
+				&& (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
new file mode 100644
index 0000000..df40577
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -0,0 +1,475 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ * 
+ *         This class implements the run generator for sorting with replacement
+ *         selection, where there is a limit on the output, i.e. we are looking
+ *         for top-k tuples (first k smallest tuples w.r.t sorting keys), if
+ *         data were sorted, where k is a specific limit, decided in advance.
+ * 
+ *         A SortMinMaxHeap is used as the selectionTree to decide the order of
+ *         writing tuples into the runs, and also to prune tuples (if possible).
+ *         Memory manager is based on a binary search tree to allocate tuples.
+ * 
+ *         The overall process is as follows (Assuming that the limit is K): -
+ *         Read the input data frame by frame. For each tuple T in the current
+ *         frame: - If currentRun R has reached the limit of K on the size, and
+ *         (T > maximum tuple of R), then ignore T. - Otherwise, try to allocate
+ *         a memory slot for writing T along with the attached header/footer
+ *         (for memory management purpose) - If T can not be allocated, try to
+ *         output as many as tuples, currently resident in memory, so that a
+ *         free slot, large enough to hold T, gets created. MinMaxHeap decides
+ *         about which tuple should be outputed at each step. Write T into the
+ *         memory. - Calculate the runID of T (based on the last output tuple
+ *         for the current run). It is either the current run or the next run.
+ *         Also calculate Poorman's Normalized Key (PNK) for T, to make
+ *         comparisons faster later. - Create an heap element for T, containing
+ *         its runID, the slot ptr to its memory location, and its PNK. - If
+ *         runID is the nextRun, insert the heap element into the heap, and
+ *         increment the size of nextRun in the stats. - If runID is the
+ *         currentRun: - If currentRun has not hit the limit of k, insert the
+ *         element into the heap, and increase currentRun size in the stats. -
+ *         (arriving here, currentRun has hit the limit of K, while T is less
+ *         than the max). Discard the current max for the current run (by
+ *         pop-ing it from the heap and unallocating its memory location) and
+ *         insert the heap element into the heap. No need to change the
+ *         currentRun size as we are replacing an old element (the old max) with
+ *         T.
+ * 
+ *         - Upon closing, write all the tuples, currently resident in memory,
+ *         into their corresponding run(s).
+ * 
+ *         - Note that upon opening a new Run R, if size of R (based on stats)
+ *         is S and (S > K), then (S-K) current maximum tuples of R (which are
+ *         resident in memory) get discarded at the beginning. MinMax heap can
+ *         be used to find these tuples.
+ */
+public class OptimizedExternalSortRunGeneratorWithLimit implements
+		IRunGenerator {
+
+	private final IHyracksTaskContext ctx;
+	private final int[] sortFields;
+	private final INormalizedKeyComputer nkc;
+	IBinaryComparatorFactory[] comparatorFactories;
+	private final IBinaryComparator[] comparators;
+	private final RecordDescriptor recordDescriptor;
+	private final List<IFrameReader> runs;
+
+	private ISelectionTree sTree;
+	private IMemoryManager memMgr;
+
+	private final int memSize;
+	private FrameTupleAccessor inputAccessor; // Used to read tuples in
+												// nextFrame()
+	private FrameTupleAppender outputAppender; // Used to write tuple to the
+												// dedicated output buffer
+	private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+										// into run(s)
+	private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+													// record from the output
+													// buffer
+	private FrameTupleAccessor fta2; // Used to read max record
+	private final int outputLimit;
+	private int curRunSize;
+	private int nextRunSize;
+	private int lastTupleIx; // Holds index of last output tuple in the
+								// dedicated output buffer
+	private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+								// by the memory manager for the new tuple
+	private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+								// the selectionTree to output
+	private Slot discard;
+	private int[] sTreeTop;
+	private int[] peek;
+	RunFileWriter writer;
+	private boolean newRun;
+	private int curRunId;
+
+	public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx,
+			int[] sortFields,
+			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDesc, int memSize, int limit) {
+
+		this.ctx = ctx;
+		this.sortFields = sortFields;
+		nkc = firstKeyNormalizerFactory == null ? null
+				: firstKeyNormalizerFactory.createNormalizedKeyComputer();
+		this.comparatorFactories = comparatorFactories;
+		comparators = new IBinaryComparator[comparatorFactories.length];
+		for (int i = 0; i < comparatorFactories.length; ++i) {
+			comparators[i] = comparatorFactories[i].createBinaryComparator();
+		}
+		this.recordDescriptor = recordDesc;
+		this.runs = new LinkedList<IFrameReader>();
+		this.memSize = memSize;
+
+		this.outputLimit = limit;
+	}
+
+	@Override
+	public void open() throws HyracksDataException {
+		runs.clear();
+		inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+				recordDescriptor);
+		outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+		outputBuffer = ctx.allocateFrame();
+		outputAppender.reset(outputBuffer, true);
+		lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+				recordDescriptor);
+		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+		this.memMgr = new BSTMemMgr(ctx, memSize);
+		this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories,
+				recordDescriptor, memMgr);
+		this.allocationPtr = new Slot();
+		this.outputedTuple = new Slot();
+		this.sTreeTop = new int[] { -1, -1, -1, -1 };
+		this.peek = new int[] { -1, -1, -1, -1 };
+		this.discard = new Slot();
+
+		curRunId = -1;
+		curRunSize = 0;
+		nextRunSize = 0;
+		openNewRun();
+	}
+
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+		inputAccessor.reset(buffer);
+		byte[] bufferArray = buffer.array();
+		int tupleCount = inputAccessor.getTupleCount();
+		for (int i = 0; i < tupleCount; i++) {
+			if (curRunSize >= outputLimit) {
+				peek = sTree.peekMax();
+				if (isEntryValid(peek)
+						&& compareRecords(inputAccessor, i,
+								peek[SortMinMaxHeap.FRAME_IX],
+								peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
+					continue;
+				}
+			}
+
+			allocationPtr.clear();
+			int tLength = inputAccessor.getTupleEndOffset(i)
+					- inputAccessor.getTupleStartOffset(i);
+			memMgr.allocate(tLength, allocationPtr);
+			while (allocationPtr.isNull()) {
+				int unAllocSize = -1;
+				while (unAllocSize < tLength) {
+					unAllocSize = outputRecord();
+					if (unAllocSize < 1) {
+						throw new HyracksDataException(
+								"Unable to allocate space for the new tuple, while there is no more tuple to output");
+					}
+				}
+				memMgr.allocate(tLength, allocationPtr);
+			}
+
+			int pnk = getPNK(inputAccessor, i, bufferArray);
+			int runId = getRunId(inputAccessor, i);
+			if (runId != curRunId) { // tuple belongs to the next run
+				memMgr.writeTuple(allocationPtr.getFrameIx(),
+						allocationPtr.getOffset(), inputAccessor, i);
+				int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+						allocationPtr.getOffset(), pnk };
+				sTree.insert(entry);
+				nextRunSize++;
+				continue;
+			}
+			// belongs to the current run
+			if (curRunSize < outputLimit) {
+				memMgr.writeTuple(allocationPtr.getFrameIx(),
+						allocationPtr.getOffset(), inputAccessor, i);
+				int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+						allocationPtr.getOffset(), pnk };
+				sTree.insert(entry);
+				curRunSize++;
+				continue;
+			}
+
+			peek = sTree.peekMax();
+			if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
+					peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
+				continue;
+			}
+			// replacing the max
+			peek = sTree.getMax();
+			discard.set(peek[SortMinMaxHeap.FRAME_IX],
+					peek[SortMinMaxHeap.OFFSET_IX]);
+			memMgr.unallocate(discard);
+			memMgr.writeTuple(allocationPtr.getFrameIx(),
+					allocationPtr.getOffset(), inputAccessor, i);
+			int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
+					allocationPtr.getOffset(), pnk };
+			sTree.insert(entry);
+		}
+	}
+
+	@Override
+	public void fail() throws HyracksDataException {
+	}
+
+	@Override
+	public void close() throws HyracksDataException {
+		while (!sTree.isEmpty()) { // Outputting remaining elements in the
+									// selectionTree
+			outputRecordForClose();
+		}
+
+		if (outputAppender.getTupleCount() > 0) { // Writing out very last
+													// resident records to file
+			FrameUtils.flushFrame(outputBuffer, writer);
+		}
+
+		writer.close();
+		runs.add(writer.createReader());
+	}
+
+	public List<IFrameReader> getRuns() {
+		return runs;
+	}
+
+	private int outputRecord() throws HyracksDataException {
+		outputedTuple.clear();
+		sTreeTop = sTree.getMin();
+		if (!isEntryValid(sTreeTop)) {
+			throw new HyracksDataException(
+					"Invalid outputed tuple (Top of the selection tree is invalid)");
+		}
+		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+		if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
+			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
+																		// not
+																		// append
+																		// to
+																		// the
+																		// tupleAppender
+				FrameUtils.flushFrame(outputBuffer, writer);
+				outputAppender.reset(outputBuffer, true);
+				if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+					throw new HyracksDataException(
+							"Can not append to the ouput buffer in sort");
+				}
+				lastTupleIx = 0;
+			} else {
+				lastTupleIx++;
+			}
+			outputedTuple.set(tFrameIx, tOffset);
+			newRun = false;
+			return memMgr.unallocate(outputedTuple);
+		}
+		// Minimum belongs to the next Run
+		openNewRun();
+		int popCount = curRunSize - outputLimit;
+		int l = 0;
+		int maxFreedSpace = 0;
+		for (int p = 0; p < popCount; p++) {
+			peek = sTree.getMax();
+			if (!isEntryValid(peek)) {
+				throw new HyracksDataException(
+						"Invalid Maximum extracted from MinMaxHeap");
+			}
+			discard.set(peek[SortMinMaxHeap.FRAME_IX],
+					peek[SortMinMaxHeap.OFFSET_IX]);
+			l = memMgr.unallocate(discard);
+			if (l > maxFreedSpace) {
+				maxFreedSpace = l;
+			}
+			curRunSize--;
+		}
+
+		if (maxFreedSpace != 0) {
+			return maxFreedSpace;
+		}
+		// No max discarded (We just flushed out the prev run, so the output
+		// buffer should be clear
+		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+																	// append to
+																	// the
+																	// tupleAppender
+			throw new HyracksDataException(
+					"Can not append to the ouput buffer in sort");
+		}
+		lastTupleIx = 0;
+		outputedTuple.set(tFrameIx, tOffset);
+		newRun = false;
+		return memMgr.unallocate(outputedTuple);
+	}
+
+	private void outputRecordForClose() throws HyracksDataException {
+		sTreeTop = sTree.getMin();
+		if (!isEntryValid(sTreeTop)) {
+			throw new HyracksDataException(
+					"Invalid outputed tuple (Top of the selection tree is invalid)");
+		}
+		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+		if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
+			openNewRun();
+		}
+
+		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+																	// append to
+																	// the
+																	// tupleAppender
+			FrameUtils.flushFrame(outputBuffer, writer);
+			outputAppender.reset(outputBuffer, true);
+			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+				throw new HyracksDataException(
+						"Can not append to the ouput buffer in sort");
+			}
+		}
+	}
+
+	private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+																				// buffInArray
+																				// out
+																				// for
+																				// better
+																				// performance
+																				// (not
+																				// converting
+																				// for
+																				// each
+																				// and
+																				// every
+																				// tuple)
+		int sfIdx = sortFields[0];
+		int tStart = fta.getTupleStartOffset(tIx);
+		int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+		int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+		int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+		return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
+				- f0StartRel));
+	}
+
+	private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+																// current
+																// record to
+																// last output
+																// record, it
+																// decided about
+																// current
+																// record's
+																// runId
+		if (newRun) { // Very first record for a new run
+			return curRunId;
+		}
+
+		byte[] lastRecBuff = outputBuffer.array();
+		lastRecordAccessor.reset(outputBuffer);
+		int lastStartOffset = lastRecordAccessor
+				.getTupleStartOffset(lastTupleIx);
+
+		ByteBuffer fr2 = fta.getBuffer();
+		byte[] curRecBuff = fr2.array();
+		int r2StartOffset = fta.getTupleStartOffset(tupIx);
+
+		for (int f = 0; f < comparators.length; ++f) {
+			int fIdx = sortFields[f];
+			int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
+					+ (fIdx - 1) * 4);
+			int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+			int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
+					+ f1Start;
+			int l1 = f1End - f1Start;
+			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+					* 4);
+			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+			int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+			int l2 = f2End - f2Start;
+			int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
+					l2);
+			if (c != 0) {
+				if (c <= 0) {
+					return curRunId;
+				} else {
+					return (curRunId + 1);
+				}
+			}
+		}
+		return curRunId;
+	}
+
+	// first<sec : -1
+	private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2,
+			int offset2) {
+		ByteBuffer buff1 = fta1.getBuffer();
+		byte[] recBuff1 = buff1.array();
+		int offset1 = fta1.getTupleStartOffset(ix1);
+
+		offset2 += BSTNodeUtil.HEADER_SIZE;
+		ByteBuffer buff2 = memMgr.getFrame(fix2);
+		fta2.reset(buff2);
+		byte[] recBuff2 = buff2.array();
+
+		for (int f = 0; f < comparators.length; ++f) {
+			int fIdx = sortFields[f];
+			int f1Start = fIdx == 0 ? 0 : buff1
+					.getInt(offset1 + (fIdx - 1) * 4);
+			int f1End = buff1.getInt(offset1 + fIdx * 4);
+			int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
+			int l1 = f1End - f1Start;
+			int f2Start = fIdx == 0 ? 0 : buff2
+					.getInt(offset2 + (fIdx - 1) * 4);
+			int f2End = buff2.getInt(offset2 + fIdx * 4);
+			int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
+			int l2 = f2End - f2Start;
+			int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
+
+			if (c != 0) {
+				return c;
+			}
+		}
+		return 0;
+
+	}
+
+	private void openNewRun() throws HyracksDataException {
+		if (writer != null) { // There is a prev run, so flush its tuples and
+								// close it first
+			if (outputAppender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(outputBuffer, writer);
+			}
+			outputAppender.reset(outputBuffer, true);
+			writer.close();
+			runs.add(writer.createReader());
+		}
+
+		FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+				ExternalSortRunGenerator.class.getSimpleName());
+		writer = new RunFileWriter(file, ctx.getIOManager());
+		writer.open();
+		curRunId++;
+		newRun = true;
+		curRunSize = nextRunSize;
+		nextRunSize = 0;
+		lastTupleIx = -1;
+	}
+
+	private boolean isEntryValid(int[] entry) {
+		return ((entry[SortMinHeap.RUN_ID_IX] > -1)
+				&& (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
new file mode 100644
index 0000000..67f3498
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+/**
+ * @author pouria
+ * 
+ *         This class defines the code for the merger, which is used to merge
+ *         the runs, generated previously through an implementation of
+ *         IRunGenerator, for sorting with replacement selection.
+ * 
+ *         If number of input runs is less than the available memory frames,
+ *         then merging can be done in one pass, by allocating one buffer per
+ *         run, and one buffer as the output buffer. A priorityQueue is used to
+ *         find the top tuple at each iteration, among all the runs' heads in
+ *         memory (check RunMergingFrameReader for more details). Otherwise,
+ *         assuming that we have R runs and M memory buffers, where (R > M), we
+ *         first merge first (M-1) runs and create a new sorted run. Discarding
+ *         the first (M-1) runs, now merging step gets applied recursively on
+ *         the (R-M+2) remaining runs using the M memory buffers.
+ * 
+ *         Merging also takes the outputLimit L, if specified, into account
+ *         during merging. Once the final pass is done on the runs (which is the
+ *         pass that generates the final sorted output), as soon as the output
+ *         hits the limit L, the process stops, closes, and returns.
+ */
+
+public class OptimizedExternalSortRunMerger {
+	private final IHyracksTaskContext ctx;
+	private final List<IFrameReader> runs;
+	private final int[] sortFields;
+	private final IBinaryComparator[] comparators;
+	private final RecordDescriptor recordDesc;
+	private final int framesLimit;
+	private final IFrameWriter writer;
+	private List<ByteBuffer> inFrames;
+	private ByteBuffer outFrame;
+	private FrameTupleAppender outFrameAppender;
+	private FrameTupleAccessor outFrameAccessor;
+	private final int outputLimit;
+	private int currentSize;
+
+	public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx,
+			int outputLimit, List<IFrameReader> runs, int[] sortFields,
+			IBinaryComparator[] comparators, RecordDescriptor recordDesc,
+			int framesLimit, IFrameWriter writer) {
+		this.ctx = ctx;
+		this.runs = new LinkedList<IFrameReader>(runs);
+		this.sortFields = sortFields;
+		this.comparators = comparators;
+		this.recordDesc = recordDesc;
+		this.framesLimit = framesLimit;
+		this.writer = writer;
+		this.outputLimit = outputLimit;
+		this.currentSize = 0;
+	}
+
+	public void process() throws HyracksDataException {
+		writer.open();
+
+		try {
+			outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+					recordDesc);
+			outFrame = ctx.allocateFrame();
+			outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+			outFrameAppender.reset(outFrame, true);
+
+			if (runs.size() == 1) {
+
+				if (outputLimit < 1) {
+					runs.get(0).open();
+					ByteBuffer nextFrame = ctx.allocateFrame();
+					while (runs.get(0).nextFrame(nextFrame)) {
+						FrameUtils.flushFrame(nextFrame, writer);
+						outFrameAppender.reset(nextFrame, true);
+					}
+					return;
+				}
+
+				int totalCount = 0;
+				runs.get(0).open();
+				FrameTupleAccessor fta = new FrameTupleAccessor(
+						ctx.getFrameSize(), recordDesc);
+				ByteBuffer nextFrame = ctx.allocateFrame();
+				while (totalCount <= outputLimit
+						&& runs.get(0).nextFrame(nextFrame)) {
+					fta.reset(nextFrame);
+					int tupCount = fta.getTupleCount();
+					if ((totalCount + tupCount) < outputLimit) {
+						FrameUtils.flushFrame(nextFrame, writer);
+						totalCount += tupCount;
+						continue;
+					}
+					// The very last buffer, which exceeds the limit
+					int copyCount = outputLimit - totalCount;
+					outFrameAppender.reset(outFrame, true);
+					for (int i = 0; i < copyCount; i++) {
+						if (!outFrameAppender.append(fta, i)) {
+							throw new IllegalStateException();
+						}
+						totalCount++;
+					}
+				}
+
+				if (outFrameAppender.getTupleCount() > 0) {
+					FrameUtils.flushFrame(outFrame, writer);
+					outFrameAppender.reset(outFrame, true);
+				}
+
+				return;
+			}
+
+			// More than one run, actual merging is needed
+			inFrames = new ArrayList<ByteBuffer>();
+			for (int i = 0; i < framesLimit - 1; ++i) {
+				inFrames.add(ctx.allocateFrame());
+			}
+			while (runs.size() > 0) {
+				try {
+					doPass(runs);
+				} catch (Exception e) {
+					throw new HyracksDataException(e);
+				}
+			}
+
+		} catch (Exception e) {
+			writer.fail();
+			throw new HyracksDataException(e);
+		} finally {
+			writer.close();
+		}
+	}
+
+	// creates a new run from runs that can fit in memory.
+	private void doPass(List<IFrameReader> runs) throws HyracksDataException {
+		FileReference newRun = null;
+		IFrameWriter writer = this.writer;
+		boolean finalPass = false;
+		if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+			finalPass = true;
+			for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+				inFrames.remove(i);
+			}
+		} else {
+			newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
+					.getSimpleName());
+			writer = new RunFileWriter(newRun, ctx.getIOManager());
+			writer.open();
+		}
+		try {
+			IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+			for (int i = 0; i < inFrames.size(); i++) {
+				runCursors[i] = runs.get(i);
+			}
+			RunMergingFrameReader merger = new RunMergingFrameReader(ctx,
+					runCursors, inFrames, sortFields, comparators, recordDesc);
+			merger.open();
+			try {
+				while (merger.nextFrame(outFrame)) {
+					if (outputLimit > 0 && finalPass) {
+						outFrameAccessor.reset(outFrame);
+						int count = outFrameAccessor.getTupleCount();
+						if ((currentSize + count) > outputLimit) {
+							ByteBuffer b = ctx.allocateFrame();
+							FrameTupleAppender partialAppender = new FrameTupleAppender(
+									ctx.getFrameSize());
+							partialAppender.reset(b, true);
+							int copyCount = outputLimit - currentSize;
+							for (int i = 0; i < copyCount; i++) {
+								partialAppender.append(outFrameAccessor, i);
+								currentSize++;
+							}
+							FrameUtils.makeReadable(b);
+							FrameUtils.flushFrame(b, writer);
+							break;
+						} else {
+							FrameUtils.flushFrame(outFrame, writer);
+							currentSize += count;
+						}
+					} else {
+						FrameUtils.flushFrame(outFrame, writer);
+					}
+				}
+			} finally {
+				merger.close();
+			}
+
+			if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+				runs.clear();
+				return;
+			}
+
+			runs.subList(0, inFrames.size()).clear();
+			if (!finalPass) {
+				runs.add(0, ((RunFileWriter) writer).createReader());
+			}
+		} finally {
+			if (!finalPass) {
+				writer.close();
+			}
+		}
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
new file mode 100644
index 0000000..85bcba2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+/**
+ * @author pouria Defines a slot in the memory, which can be a free or allocated
+ *         slot. Memory is a set of frames, ordered as a list. Each tuple is
+ *         stored in a slot, where the location of the slot is denoted by a pair
+ *         of integers: - The index of the frame, in the lits of frames in
+ *         memory - The starting offset of the slot, within the the specific
+ *         frame
+ */
+public class Slot {
+
+	private int frameIx;
+	private int offset;
+
+	public Slot() {
+		this.frameIx = BSTNodeUtil.INVALID_INDEX;
+		this.offset = BSTNodeUtil.INVALID_INDEX;
+	}
+
+	public Slot(int frameIx, int offset) {
+		this.frameIx = frameIx;
+		this.offset = offset;
+	}
+
+	public void set(int frameIx, int offset) {
+		this.frameIx = frameIx;
+		this.offset = offset;
+	}
+
+	public int getFrameIx() {
+		return frameIx;
+	}
+
+	public void setFrameIx(int frameIx) {
+		this.frameIx = frameIx;
+	}
+
+	public int getOffset() {
+		return offset;
+	}
+
+	public void setOffset(int offset) {
+		this.offset = offset;
+	}
+
+	public boolean isNull() {
+		return (frameIx == BSTNodeUtil.INVALID_INDEX)
+				|| (offset == BSTNodeUtil.INVALID_INDEX);
+	}
+
+	public void clear() {
+		this.frameIx = BSTNodeUtil.INVALID_INDEX;
+		this.offset = BSTNodeUtil.INVALID_INDEX;
+	}
+
+	public void copy(Slot s) {
+		this.frameIx = s.getFrameIx();
+		this.offset = s.getOffset();
+	}
+
+	public String toString() {
+		return "(" + frameIx + ", " + offset + ")";
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
new file mode 100644
index 0000000..d811de8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -0,0 +1,249 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * 
+ * @author pouria Implements a minimum binary heap, used as selection tree, for
+ *         sort with replacement. This heap structure can only be used as the
+ *         min heap (no access to the max element) Two elements in the heap are
+ *         compared based on their run numbers, and sorting key(s):
+ * 
+ *         Considering two elements A and B:
+ * 
+ *         if RunNumber(A) > RunNumber(B) then A is larger than B if
+ *         RunNumber(A) == RunNumber(B), then A is smaller than B, only if the
+ *         value of the sort key(s) in B is greater than A (based on the sort
+ *         comparator)
+ * 
+ */
+public class SortMinHeap implements ISelectionTree {
+
+	static final int RUN_ID_IX = 0;
+	static final int FRAME_IX = 1;
+	static final int OFFSET_IX = 2;
+	final int PNK_IX = 3;
+
+	private final int[] sortFields;
+	private final IBinaryComparator[] comparators;
+	private final RecordDescriptor recordDescriptor;
+	private final FrameTupleAccessor fta1;
+	private final FrameTupleAccessor fta2;
+
+	List<int[]> tree;
+	IMemoryManager memMgr;
+
+	public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDesc, IMemoryManager memMgr) {
+		this.sortFields = sortFields;
+		this.comparators = new IBinaryComparator[comparatorFactories.length];
+		for (int i = 0; i < comparatorFactories.length; ++i) {
+			this.comparators[i] = comparatorFactories[i]
+					.createBinaryComparator();
+		}
+		this.recordDescriptor = recordDesc;
+		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+		this.memMgr = memMgr;
+
+		this.tree = new ArrayList<int[]>();
+	}
+
+	/*
+	 * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
+	 */
+	@Override
+	public int[] getMin() {
+		return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
+	}
+
+	@Override
+	public int[] peekMin() {
+		if (tree.size() == 0) {
+			return (new int[] { -1, -1, -1, -1 });
+		}
+		int[] top = tree.get(0);
+		return new int[] { top[0], top[1], top[2], top[3] };
+	}
+
+	@Override
+	public void insert(int[] e) {
+		tree.add(e);
+		siftUp(tree.size() - 1);
+	}
+
+	@Override
+	public void reset() {
+		this.tree.clear();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return (tree.size() < 1);
+	}
+
+	public int _debugGetSize() {
+		return tree.size();
+	}
+
+	private int[] delete(int nix) {
+		int[] nv = tree.get(nix);
+		int[] last = tree.remove(tree.size() - 1);
+
+		if (tree.size() > 0) {
+			tree.set(nix, last);
+		} else {
+			return nv;
+		}
+
+		int pIx = getParent(nix);
+		if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+			siftUp(nix);
+		} else {
+			siftDown(nix);
+		}
+		return nv;
+	}
+
+	private void siftUp(int nodeIx) {
+		int p = getParent(nodeIx);
+		if (p < 0) {
+			return;
+		}
+		while (p > -1 && (compare(nodeIx, p) < 0)) {
+			swap(p, nodeIx);
+			nodeIx = p;
+			p = getParent(nodeIx);
+			if (p < 0) { // We are at the root
+				return;
+			}
+		}
+	}
+
+	private void siftDown(int nodeIx) {
+		int mix = getMinOfChildren(nodeIx);
+		if (mix < 0) {
+			return;
+		}
+		while (mix > -1 && (compare(mix, nodeIx) < 0)) {
+			swap(mix, nodeIx);
+			nodeIx = mix;
+			mix = getMinOfChildren(nodeIx);
+			if (mix < 0) { // We hit the leaf level
+				return;
+			}
+		}
+	}
+
+	// first < sec : -1
+	private int compare(int nodeSIx1, int nodeSIx2) {
+		int[] n1 = tree.get(nodeSIx1);
+		int[] n2 = tree.get(nodeSIx2);
+		return (compare(n1, n2));
+	}
+
+	// first < sec : -1
+	private int compare(int[] n1, int[] n2) {
+		// Compare Run Numbers
+		if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+			return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+		}
+
+		// Compare Poor man Normalized Keys
+		if (n1[PNK_IX] != n2[PNK_IX]) {
+			return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
+					: 1;
+		}
+
+		return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
+				n1[OFFSET_IX], n2[OFFSET_IX]);
+	}
+
+	private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
+			int r2StartOffset) {
+		byte[] b1 = fr1.array();
+		byte[] b2 = fr2.array();
+		fta1.reset(fr1);
+		fta2.reset(fr2);
+		int headerLen = BSTNodeUtil.HEADER_SIZE;
+		r1StartOffset += headerLen;
+		r2StartOffset += headerLen;
+		for (int f = 0; f < comparators.length; ++f) {
+			int fIdx = sortFields[f];
+			int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
+					* 4);
+			int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+			int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+			int l1 = f1End - f1Start;
+			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+					* 4);
+			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+			int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+			int l2 = f2End - f2Start;
+
+			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+			if (c != 0) {
+				return c;
+			}
+		}
+		return 0;
+	}
+
+	private int getMinOfChildren(int nix) { // returns index of min child
+		int lix = getLeftChild(nix);
+		if (lix < 0) {
+			return -1;
+		}
+		int rix = getRightChild(nix);
+		if (rix < 0) {
+			return lix;
+		}
+		return ((compare(lix, rix) < 0) ? lix : rix);
+	}
+
+	private void swap(int n1Ix, int n2Ix) {
+		int[] temp = tree.get(n1Ix);
+		tree.set(n1Ix, tree.get(n2Ix));
+		tree.set(n2Ix, temp);
+	}
+
+	private int getLeftChild(int ix) {
+		int lix = 2 * ix + 1;
+		return ((lix < tree.size()) ? lix : -1);
+	}
+
+	private int getRightChild(int ix) {
+		int rix = 2 * ix + 2;
+		return ((rix < tree.size()) ? rix : -1);
+	}
+
+	private int getParent(int ix) {
+		return ((ix - 1) / 2);
+	}
+
+	private ByteBuffer getFrame(int frameIx) {
+		return (memMgr.getFrame(frameIx));
+	}
+
+	@Override
+	public int[] getMax() {
+		System.err.println("getMax() method not implemented for Min Heap");
+		return null;
+	}
+
+	@Override
+	public int[] peekMax() {
+		System.err.println("peekMax() method not implemented for Min Heap");
+		return null;
+	}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
new file mode 100644
index 0000000..ab66245
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -0,0 +1,394 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * @author pouria
+ * 
+ *         Implements a MinMax binary heap, used as the selection tree, in
+ *         sorting with replacement. Check SortMinHeap for details on comparing
+ *         elements.
+ */
+public class SortMinMaxHeap implements ISelectionTree {
+	static final int RUN_ID_IX = 0;
+	static final int FRAME_IX = 1;
+	static final int OFFSET_IX = 2;
+	final int PNK_IX = 3;
+
+	static final int NOT_EXIST = -1;
+
+	private final int[] sortFields;
+	private final IBinaryComparator[] comparators;
+	private final RecordDescriptor recordDescriptor;
+	private final FrameTupleAccessor fta1;
+	private final FrameTupleAccessor fta2;
+
+	List<int[]> tree;
+	IMemoryManager memMgr;
+
+	public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields,
+			IBinaryComparatorFactory[] comparatorFactories,
+			RecordDescriptor recordDesc, IMemoryManager memMgr) {
+		this.sortFields = sortFields;
+		this.comparators = new IBinaryComparator[comparatorFactories.length];
+		for (int i = 0; i < comparatorFactories.length; ++i) {
+			this.comparators[i] = comparatorFactories[i]
+					.createBinaryComparator();
+		}
+		this.recordDescriptor = recordDesc;
+		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+		this.memMgr = memMgr;
+
+		this.tree = new ArrayList<int[]>();
+	}
+
+	@Override
+	public void insert(int[] element) {
+		tree.add(element);
+		bubbleUp(tree.size() - 1);
+	}
+
+	@Override
+	public int[] getMin() {
+		return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
+	}
+
+	@Override
+	public void reset() {
+		this.tree.clear();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return (tree.size() < 1);
+	}
+
+	@Override
+	public int[] peekMin() {
+		if (tree.size() == 0) {
+			return (new int[] { -1, -1, -1, -1 });
+		}
+		int[] top = tree.get(0);
+		return new int[] { top[0], top[1], top[2], top[3] };
+	}
+
+	@Override
+	public int[] getMax() {
+		if (tree.size() == 1) {
+			return tree.remove(0);
+		}
+		if (tree.size() > 1) {
+			int lc = getLeftChild(0);
+			int rc = getRightChild(0);
+			if (rc == -1) {
+				return delete(lc);
+			}
+			return (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+		}
+		return new int[] { -1, -1, -1, -1 };
+	}
+
+	@Override
+	public int[] peekMax() {
+		if (tree.size() == 1) {
+			int[] t = tree.get(0);
+			return new int[] { t[0], t[1], t[2], t[3] };
+		}
+		if (tree.size() > 1) {
+			int lc = getLeftChild(0);
+			int rc = getRightChild(0);
+			if (rc == -1) {
+				int[] t = tree.get(lc);
+				return new int[] { t[0], t[1], t[2], t[3] };
+			}
+			int[] t = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+			return new int[] { t[0], t[1], t[2], t[3] };
+		}
+		return new int[] { -1, -1, -1, -1 };
+	}
+
+	private int[] delete(int delIx) {
+		int s = tree.size();
+		if (s > 1) {
+			int[] delEntry = tree.get(delIx);
+			int[] last = (tree.remove(s - 1));
+			if (delIx != tree.size()) {
+				tree.set(delIx, last);
+				trickleDown(delIx);
+			}
+			return delEntry;
+		} else if (s == 1) {
+			return tree.remove(0);
+		}
+		return null;
+	}
+
+	private void bubbleUp(int ix) {
+		int p = getParentIx(ix);
+		if (isAtMinLevel(ix)) {
+			if (p != NOT_EXIST && compare(p, ix) < 0) {
+				swap(ix, p);
+				bubbleUpMax(p);
+			} else {
+				bubbleUpMin(ix);
+			}
+		} else { // i is at max level
+			if (p != NOT_EXIST && compare(ix, p) < 0) {
+				swap(ix, p);
+				bubbleUpMin(p);
+			} else {
+				bubbleUpMax(ix);
+			}
+		}
+	}
+
+	private void bubbleUpMax(int ix) {
+		int gp = getGrandParent(ix);
+		if (gp != NOT_EXIST && compare(gp, ix) < 0) {
+			swap(ix, gp);
+			bubbleUpMax(gp);
+		}
+	}
+
+	private void bubbleUpMin(int ix) {
+		int gp = getGrandParent(ix);
+		if (gp != NOT_EXIST && compare(ix, gp) < 0) {
+			swap(ix, gp);
+			bubbleUpMin(gp);
+		}
+	}
+
+	private void trickleDown(int ix) {
+		if (isAtMinLevel(ix)) {
+			trickleDownMin(ix);
+		} else {
+			trickleDownMax(ix);
+		}
+	}
+
+	private void trickleDownMax(int ix) {
+		int maxIx = getMaxOfDescendents(ix);
+		if (maxIx == NOT_EXIST) {
+			return;
+		}
+		if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
+																		// children
+			if (compare(ix, maxIx) < 0) {
+				swap(maxIx, ix);
+				int p = getParentIx(maxIx);
+				if (p != NOT_EXIST && compare(maxIx, p) < 0) {
+					swap(maxIx, p);
+				}
+				trickleDownMax(maxIx);
+			}
+		} else { // A children
+			if (compare(ix, maxIx) < 0) {
+				swap(ix, maxIx);
+			}
+		}
+	}
+
+	private void trickleDownMin(int ix) {
+		int minIx = getMinOfDescendents(ix);
+		if (minIx == NOT_EXIST) {
+			return;
+		}
+		if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
+																		// children
+			if (compare(minIx, ix) < 0) {
+				swap(minIx, ix);
+				int p = getParentIx(minIx);
+				if (p != NOT_EXIST && compare(p, minIx) < 0) {
+					swap(minIx, p);
+				}
+				trickleDownMin(minIx);
+			}
+		} else { // A children
+			if (compare(minIx, ix) < 0) {
+				swap(ix, minIx);
+			}
+		}
+	}
+
+	// Min among children and grand children
+	private int getMinOfDescendents(int ix) {
+		int lc = getLeftChild(ix);
+		if (lc == NOT_EXIST) {
+			return NOT_EXIST;
+		}
+		int rc = getRightChild(ix);
+		if (rc == NOT_EXIST) {
+			return lc;
+		}
+		int min = (compare(lc, rc) < 0) ? lc : rc;
+		int[] lgc = getLeftGrandChildren(ix);
+		int[] rgc = getRightGrandChildren(ix);
+		for (int k = 0; k < 2; k++) {
+			if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
+				min = lgc[k];
+			}
+			if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
+				min = rgc[k];
+			}
+		}
+		return min;
+	}
+
+	// Max among children and grand children
+	private int getMaxOfDescendents(int ix) {
+		int lc = getLeftChild(ix);
+		if (lc == NOT_EXIST) {
+			return NOT_EXIST;
+		}
+		int rc = getRightChild(ix);
+		if (rc == NOT_EXIST) {
+			return lc;
+		}
+		int max = (compare(lc, rc) < 0) ? rc : lc;
+		int[] lgc = getLeftGrandChildren(ix);
+		int[] rgc = getRightGrandChildren(ix);
+		for (int k = 0; k < 2; k++) {
+			if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
+				max = lgc[k];
+			}
+			if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
+				max = rgc[k];
+			}
+		}
+		return max;
+	}
+
+	private void swap(int n1Ix, int n2Ix) {
+		int[] temp = tree.get(n1Ix);
+		tree.set(n1Ix, tree.get(n2Ix));
+		tree.set(n2Ix, temp);
+	}
+
+	private int getParentIx(int i) {
+		if (i == 0) {
+			return NOT_EXIST;
+		}
+		return (i - 1) / 2;
+	}
+
+	private int getGrandParent(int i) {
+		int p = getParentIx(i);
+		return p != -1 ? getParentIx(p) : NOT_EXIST;
+	}
+
+	private int getLeftChild(int i) {
+		int lc = 2 * i + 1;
+		return lc < tree.size() ? lc : NOT_EXIST;
+	}
+
+	private int[] getLeftGrandChildren(int i) {
+		int lc = getLeftChild(i);
+		return lc != NOT_EXIST ? new int[] { getLeftChild(lc),
+				getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
+	}
+
+	private int getRightChild(int i) {
+		int rc = 2 * i + 2;
+		return rc < tree.size() ? rc : -1;
+	}
+
+	private int[] getRightGrandChildren(int i) {
+		int rc = getRightChild(i);
+		return rc != NOT_EXIST ? new int[] { getLeftChild(rc),
+				getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
+	}
+
+	private boolean isAtMinLevel(int i) {
+		int l = getLevel(i);
+		return l % 2 == 0 ? true : false;
+	}
+
+	private int getLevel(int i) {
+		if (i == 0) {
+			return 0;
+		}
+		int l = (int) Math.floor(Math.log(i) / Math.log(2));
+		if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+			return (l + 1);
+		}
+		return l;
+	}
+
+	private ByteBuffer getFrame(int frameIx) {
+		return (memMgr.getFrame(frameIx));
+	}
+
+	// first < sec : -1
+	private int compare(int nodeSIx1, int nodeSIx2) {
+		int[] n1 = tree.get(nodeSIx1);
+		int[] n2 = tree.get(nodeSIx2);
+		return (compare(n1, n2));
+	}
+
+	// first < sec : -1
+	private int compare(int[] n1, int[] n2) {
+		// Compare Run Numbers
+		if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+			return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+		}
+
+		// Compare Poor man Normalized Keys
+		if (n1[PNK_IX] != n2[PNK_IX]) {
+			return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
+					: 1;
+		}
+
+		return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
+				n1[OFFSET_IX], n2[OFFSET_IX]);
+	}
+
+	private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
+			int r2StartOffset) {
+		byte[] b1 = fr1.array();
+		byte[] b2 = fr2.array();
+		fta1.reset(fr1);
+		fta2.reset(fr2);
+		int headerLen = BSTNodeUtil.HEADER_SIZE;
+		r1StartOffset += headerLen;
+		r2StartOffset += headerLen;
+		for (int f = 0; f < comparators.length; ++f) {
+			int fIdx = sortFields[f];
+			int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
+					* 4);
+			int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+			int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+			int l1 = f1End - f1Start;
+			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
+					* 4);
+			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+			int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+			int l2 = f2End - f2Start;
+
+			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+			if (c != 0) {
+				return c;
+			}
+		}
+		return 0;
+	}
+
+	public String _debugPrintTree() {
+		String s = "";
+		for (int i = 0; i < tree.size(); i++) {
+			int[] n = tree.get(i);
+			s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", "
+					+ n[OFFSET_IX] + "), ";
+		}
+		return s;
+	}
+}
\ No newline at end of file