Addressing Review Comments for External Sort with Replacement Selection

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@869 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index 28c3f34..f9dda7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -8,790 +8,743 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
 /**
- * @author pouria Memory Manager based on Binary Search Tree (BST) Free slot
- *         size is the key for the BST nodes. Each node in BST shows a class of
- *         free slots, while all the free slots with the exact same length are
- *         stored as a LinkedList, whose head is a BST node. BST is not stored
- *         as a separate data structure, but the actual free spaces of the
- *         memory slots are used to hold BST nodes. Each BST node has the
- *         logical structure, defined in the BSTNodeUtil class.
+ * @author pouria
+ * 
+ *         Implements Memory Manager based on creating Binary Search Tree (BST)
+ *         while Free slot size is the key for the BST nodes. Each node in BST
+ *         shows a class of free slots, while all the free slots within a class
+ *         have same lengths. Slots in a class are stored as a LinkedList, whose
+ *         head is the BST node, corresponding to that class. BST is not stored
+ *         as a separate data structure, but the free slots in the memory are
+ *         used to hold BST nodes. Each BST node has the logical structure,
+ *         defined in the BSTNodeUtil class.
  * 
  */
 public class BSTMemMgr implements IMemoryManager {
 
-	private final IHyracksCommonContext ctx;
-	public static int FRAME_SIZE;
+    private final IHyracksCommonContext ctx;
+    public static int frameSize;
 
-	private ByteBuffer[] frames;
-	private ByteBuffer convertBuffer;
-	private Slot root;
-	private Slot result; // A reusable object to hold one node returned as
-							// methods result
-	private Slot insertSlot; // A reusable object to hold one node in the insert
-								// process
-	private Slot[] par_res;
-	private int lastFrame;
+    private ByteBuffer[] frames;
+    private ByteBuffer convertBuffer;
+    private Slot root;
+    private Slot result; // A reusable object to hold one node returned as
+                         // method result
+    private Slot insertSlot; // A reusable object to hold one node within insert
+                             // process
 
-	private static int _debug_free_slots = 0;
-	private static int _debug_tree_size = 0;
-	private int _debug_total_lookup_steps;
-	private int _debug_total_lookup_counts;
-	private int _debug_depth_counter;
-	private int _debug_max_depth;
+    private Slot[] parentRes;
+    private int lastFrame;
 
-	public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
-		this.ctx = ctx;
-		FRAME_SIZE = ctx.getFrameSize();
-		convertBuffer = ByteBuffer.allocate(4);
-		frames = new ByteBuffer[memSize];
-		lastFrame = -1;
-		root = new Slot();
-		insertSlot = new Slot();
-		result = new Slot();
-		par_res = new Slot[] { new Slot(), new Slot() };
-		_debug_total_lookup_counts = 0;
-		_debug_total_lookup_steps = 0;
-	}
+    // Variables used for debugging/performance testing
+    private int debugFreeSlots = 0;
+    private int debugTreeSize = 0;
+    private int debugTotalLookupSteps;
+    private int debugTotalLookupCounts;
+    private int debugDepthCounter;
+    private int debugMaxDepth;
 
-	/**
-	 * result is the container sent by the caller to hold the results
-	 */
-	@Override
-	public void allocate(int length, Slot result) throws HyracksDataException {
-		_debug_total_lookup_counts++;
-		search(length, par_res);
-		if (par_res[1].isNull()) {
-			addFrame(par_res);
-			if (par_res[1].isNull()) {
-				return;
-			}
-		}
+    public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+        this.ctx = ctx;
+        frameSize = ctx.getFrameSize();
+        convertBuffer = ByteBuffer.allocate(4);
+        frames = new ByteBuffer[memSize];
+        lastFrame = -1;
+        root = new Slot();
+        insertSlot = new Slot();
+        result = new Slot();
+        parentRes = new Slot[] { new Slot(), new Slot() };
+        debugTotalLookupCounts = 0;
+        debugTotalLookupSteps = 0;
+    }
 
-		int sl = BSTNodeUtil.getLength(par_res[1], frames, convertBuffer);
-		int acLen = BSTNodeUtil.getActualLength(length);
-		if (shouldSplit(sl, acLen)) {
-			int[] s = split(par_res[1], par_res[0], acLen);
-			int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames,
-					convertBuffer);
-			insert(s[2], s[3], insertLen); // inserting second half of the split
-											// slot
-			BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
-			result.set(s[0], s[1]);
-			return;
-		}
-		allocate(par_res[1], par_res[0], length, result);
-	}
+    /**
+     * result is the container sent by the caller to hold the results
+     */
+    @Override
+    public void allocate(int length, Slot result) throws HyracksDataException {
+        debugTotalLookupCounts++;
+        search(length, parentRes);
+        if (parentRes[1].isNull()) {
+            addFrame(parentRes);
+            if (parentRes[1].isNull()) {
+                return;
+            }
+        }
 
-	@Override
-	public int unallocate(Slot s) throws HyracksDataException {
-		int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
-		int actualLen = BSTNodeUtil.getActualLength(usedLen);
-		int fix = s.getFrameIx();
-		int off = s.getOffset();
+        int sl = BSTNodeUtil.getLength(parentRes[1], frames, convertBuffer);
+        int acLen = BSTNodeUtil.getActualLength(length);
+        if (shouldSplit(sl, acLen)) {
+            int[] s = split(parentRes[1], parentRes[0], acLen);
+            int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
+            insert(s[2], s[3], insertLen); // inserting second half of the split
+                                           // slot
+            BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
+            result.set(s[0], s[1]);
+            return;
+        }
+        allocate(parentRes[1], parentRes[0], length, result);
+    }
 
-		int prevMemSlotFooter_off = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
-				: BSTNodeUtil.INVALID_INDEX);
-		int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
-		int nextMemSlotHeader_off = (t < FRAME_SIZE ? t
-				: BSTNodeUtil.INVALID_INDEX);
-		// Remember: next and prev memory slots have the same frame index as the
-		// unallocating slot
-		if (!isNodeNull(fix, prevMemSlotFooter_off)
-				&& BSTNodeUtil.isFree(fix, prevMemSlotFooter_off, frames)) {
-			int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooter_off,
-					frames, convertBuffer);
-			removeFromList(fix, prevMemSlotFooter_off - leftLength
-					- BSTNodeUtil.HEADER_SIZE);
-			int concatLength = actualLen + leftLength + 2
-					* BSTNodeUtil.HEADER_SIZE;
-			if (!isNodeNull(fix, nextMemSlotHeader_off)
-					&& BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
-				removeFromList(fix, nextMemSlotHeader_off);
-				concatLength += BSTNodeUtil.getLength(fix,
-						nextMemSlotHeader_off, frames, convertBuffer)
-						+ 2
-						* BSTNodeUtil.HEADER_SIZE;
-			}
-			insert(fix, prevMemSlotFooter_off - leftLength
-					- BSTNodeUtil.HEADER_SIZE, concatLength); // newly (merged)
-																// slot starts
-																// at the prev
-																// slot offset
-			return concatLength;
+    @Override
+    public int unallocate(Slot s) throws HyracksDataException {
+        int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
+        int actualLen = BSTNodeUtil.getActualLength(usedLen);
+        int fix = s.getFrameIx();
+        int off = s.getOffset();
 
-		} else if (!isNodeNull(fix, nextMemSlotHeader_off)
-				&& BSTNodeUtil.isFree(fix, nextMemSlotHeader_off, frames)) {
-			removeFromList(fix, nextMemSlotHeader_off);
-			int concatLength = actualLen
-					+ BSTNodeUtil.getLength(fix, nextMemSlotHeader_off, frames,
-							convertBuffer) + 2 * BSTNodeUtil.HEADER_SIZE;
-			insert(fix, off, concatLength); // newly (merged) slot starts at the
-											// unallocating slot offset
-			return concatLength;
-		}
-		// unallocating slot is not merging with any neighbor
-		insert(fix, off, actualLen);
-		return actualLen;
-	}
+        int prevMemSlotFooterOffset = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
+                : BSTNodeUtil.INVALID_INDEX);
+        int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
+        int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
+        // Remember: next and prev memory slots have the same frame index as the
+        // unallocated slot
+        if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
+            int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
+            removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
+            int concatLength = actualLen + leftLength + 2 * BSTNodeUtil.HEADER_SIZE;
+            if (!isNodeNull(fix, nextMemSlotHeaderOffset) && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
+                removeFromList(fix, nextMemSlotHeaderOffset);
+                concatLength += BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer) + 2
+                        * BSTNodeUtil.HEADER_SIZE;
+            }
+            insert(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE, concatLength); // newly
+                                                                                                       // (merged)
+                                                                                                       // slot
+                                                                                                       // starts
+                                                                                                       // at
+                                                                                                       // the
+                                                                                                       // prev
+                                                                                                       // slot
+                                                                                                       // offset
+            return concatLength;
 
-	@Override
-	public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
-		int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
-		int length = BSTNodeUtil.getLength(frameIx, offset, frames,
-				convertBuffer);
-		return dest.append(frames[frameIx].array(), offToRead, length);
-	}
+        } else if (!isNodeNull(fix, nextMemSlotHeaderOffset)
+                && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
+            removeFromList(fix, nextMemSlotHeaderOffset);
+            int concatLength = actualLen + BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer)
+                    + 2 * BSTNodeUtil.HEADER_SIZE;
+            insert(fix, off, concatLength); // newly (merged) slot starts at the
+                                            // unallocated slot offset
+            return concatLength;
+        }
+        // unallocated slot is not merging with any neighbor
+        insert(fix, off, actualLen);
+        return actualLen;
+    }
 
-	@Override
-	public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
-			int tIndex) {
-		int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
-		int tStartOffset = src.getTupleStartOffset(tIndex);
-		int tEndOffset = src.getTupleEndOffset(tIndex);
-		int tupleLength = tEndOffset - tStartOffset;
-		ByteBuffer srcBuffer = src.getBuffer();
-		System.arraycopy(srcBuffer.array(), tStartOffset,
-				frames[frameIx].array(), offToCopy, tupleLength);
-		return true;
-	}
+    @Override
+    public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
+        int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
+        int length = BSTNodeUtil.getLength(frameIx, offset, frames, convertBuffer);
+        return dest.append(frames[frameIx].array(), offToRead, length);
+    }
 
-	@Override
-	public ByteBuffer getFrame(int frameIndex) {
-		return frames[frameIndex];
-	}
+    @Override
+    public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex) {
+        int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
+        int tStartOffset = src.getTupleStartOffset(tIndex);
+        int tEndOffset = src.getTupleEndOffset(tIndex);
+        int tupleLength = tEndOffset - tStartOffset;
+        ByteBuffer srcBuffer = src.getBuffer();
+        System.arraycopy(srcBuffer.array(), tStartOffset, frames[frameIx].array(), offToCopy, tupleLength);
+        return true;
+    }
 
-	public String _debug_getAvgSearchPath() {
-		double avg = (((double) _debug_total_lookup_steps) / ((double) _debug_total_lookup_counts));
-		return "\nTotal allocation requests:\t" + _debug_total_lookup_counts
-				+ "\nAvg Allocation Path Length:\t" + avg
-				+ "\nMax BST Depth:\t" + _debug_max_depth;
-	}
+    @Override
+    public ByteBuffer getFrame(int frameIndex) {
+        return frames[frameIndex];
+    }
 
-	public void _debug_decLookupCount() {
-		_debug_total_lookup_counts--;
-	}
+    public String debugGetAvgSearchPath() {
+        double avg = (((double) debugTotalLookupSteps) / ((double) debugTotalLookupCounts));
+        return "\nTotal allocation requests:\t" + debugTotalLookupCounts + "\nAvg Allocation Path Length:\t" + avg
+                + "\nMax BST Depth:\t" + debugMaxDepth;
+    }
 
-	/**
-	 * 
-	 * @param p_r
-	 *            is the container passed by the caller to contain the results
-	 * @throws HyracksDataException
-	 */
-	private void addFrame(Slot[] p_r) throws HyracksDataException {
-		_debug_depth_counter = 0;
-		clear(p_r);
-		if ((lastFrame + 1) >= frames.length) {
-			return;
-		}
-		frames[++lastFrame] = allocateFrame();
-		int l = FRAME_SIZE - 2 * BSTNodeUtil.HEADER_SIZE;
-		BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
-		initNewNode(lastFrame, 0);
+    public void debugDecLookupCount() {
+        debugTotalLookupCounts--;
+    }
 
-		p_r[1].copy(root);
-		if (p_r[1].isNull()) { // root is null
-			root.set(lastFrame, 0);
-			initNewNode(root.getFrameIx(), root.getOffset());
-			p_r[1].copy(root);
-			return;
-		}
+    /**
+     * 
+     * @param parentResult
+     *            is the container passed by the caller to contain the results
+     * @throws HyracksDataException
+     */
+    private void addFrame(Slot[] parentResult) throws HyracksDataException {
+        debugDepthCounter = 0;
+        clear(parentResult);
+        if ((lastFrame + 1) >= frames.length) {
+            return;
+        }
+        frames[++lastFrame] = allocateFrame();
+        int l = frameSize - 2 * BSTNodeUtil.HEADER_SIZE;
+        BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
+        initNewNode(lastFrame, 0);
 
-		while (!p_r[1].isNull()) {
-			_debug_depth_counter++;
-			if (BSTNodeUtil.getLength(p_r[1], frames, convertBuffer) == l) {
-				append(p_r[1].getFrameIx(), p_r[1].getOffset(), lastFrame, 0);
-				p_r[1].set(lastFrame, 0);
-				if (_debug_depth_counter > _debug_max_depth) {
-					_debug_max_depth = _debug_depth_counter;
-				}
-				return;
-			}
-			if (l < BSTNodeUtil.getLength(p_r[1], frames, convertBuffer)) {
-				if (isNodeNull(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
-						convertBuffer), BSTNodeUtil.leftChild_offset(p_r[1],
-						frames, convertBuffer))) {
-					BSTNodeUtil.setLeftChild(p_r[1].getFrameIx(),
-							p_r[1].getOffset(), lastFrame, 0, frames);
-					p_r[0].copy(p_r[1]);
-					p_r[1].set(lastFrame, 0);
-					if (_debug_depth_counter > _debug_max_depth) {
-						_debug_max_depth = _debug_depth_counter;
-					}
-					return;
-				} else {
-					p_r[0].copy(p_r[1]);
-					p_r[1].set(BSTNodeUtil.leftChild_fIx(p_r[1], frames,
-							convertBuffer), BSTNodeUtil.leftChild_offset(
-							p_r[1], frames, convertBuffer));
-				}
-			} else {
-				if (isNodeNull(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
-						convertBuffer), BSTNodeUtil.rightChild_offset(p_r[1],
-						frames, convertBuffer))) {
-					BSTNodeUtil.setRightChild(p_r[1].getFrameIx(),
-							p_r[1].getOffset(), lastFrame, 0, frames);
-					p_r[0].copy(p_r[1]);
-					p_r[1].set(lastFrame, 0);
-					if (_debug_depth_counter > _debug_max_depth) {
-						_debug_max_depth = _debug_depth_counter;
-					}
-					return;
-				} else {
-					p_r[0].copy(p_r[1]);
-					p_r[1].set(BSTNodeUtil.rightChild_fIx(p_r[1], frames,
-							convertBuffer), BSTNodeUtil.rightChild_offset(
-							p_r[1], frames, convertBuffer));
-				}
-			}
-		}
-		throw new HyracksDataException(
-				"New Frame could not be added to BSTMemMgr");
-	}
+        parentResult[1].copy(root);
+        if (parentResult[1].isNull()) { // root is null
+            root.set(lastFrame, 0);
+            initNewNode(root.getFrameIx(), root.getOffset());
+            parentResult[1].copy(root);
+            return;
+        }
 
-	private void insert(int fix, int off, int length)
-			throws HyracksDataException {
-		_debug_depth_counter = 0;
-		BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
-		initNewNode(fix, off);
+        while (!parentResult[1].isNull()) {
+            debugDepthCounter++;
+            if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
+                append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
+                parentResult[1].set(lastFrame, 0);
+                if (debugDepthCounter > debugMaxDepth) {
+                    debugMaxDepth = debugDepthCounter;
+                }
+                return;
+            }
+            if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
+                if (isNodeNull(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
+                        BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer))) {
+                    BSTNodeUtil.setLeftChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
+                            frames);
+                    parentResult[0].copy(parentResult[1]);
+                    parentResult[1].set(lastFrame, 0);
+                    if (debugDepthCounter > debugMaxDepth) {
+                        debugMaxDepth = debugDepthCounter;
+                    }
+                    return;
+                } else {
+                    parentResult[0].copy(parentResult[1]);
+                    parentResult[1].set(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
+                            BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer));
+                }
+            } else {
+                if (isNodeNull(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
+                        BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer))) {
+                    BSTNodeUtil.setRightChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
+                            frames);
+                    parentResult[0].copy(parentResult[1]);
+                    parentResult[1].set(lastFrame, 0);
+                    if (debugDepthCounter > debugMaxDepth) {
+                        debugMaxDepth = debugDepthCounter;
+                    }
+                    return;
+                } else {
+                    parentResult[0].copy(parentResult[1]);
+                    parentResult[1].set(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
+                            BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer));
+                }
+            }
+        }
+        throw new HyracksDataException("New Frame could not be added to BSTMemMgr");
+    }
 
-		if (root.isNull()) {
-			root.set(fix, off);
-			return;
-		}
+    private void insert(int fix, int off, int length) throws HyracksDataException {
+        debugDepthCounter = 0;
+        BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
+        initNewNode(fix, off);
 
-		insertSlot.clear();
-		insertSlot.copy(root);
-		while (!insertSlot.isNull()) {
-			_debug_depth_counter++;
-			int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames,
-					convertBuffer);
-			if (curSlotLen == length) {
-				append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix,
-						off);
-				if (_debug_depth_counter > _debug_max_depth) {
-					_debug_max_depth = _debug_depth_counter;
-				}
-				return;
-			}
-			if (length < curSlotLen) {
-				int lc_fix = BSTNodeUtil.leftChild_fIx(insertSlot, frames,
-						convertBuffer);
-				int lc_off = BSTNodeUtil.leftChild_offset(insertSlot, frames,
-						convertBuffer);
-				if (isNodeNull(lc_fix, lc_off)) {
-					initNewNode(fix, off);
-					BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(),
-							insertSlot.getOffset(), fix, off, frames);
-					if (_debug_depth_counter > _debug_max_depth) {
-						_debug_max_depth = _debug_depth_counter;
-					}
-					return;
-				} else {
-					insertSlot.set(lc_fix, lc_off);
-				}
-			} else {
-				int rc_fix = BSTNodeUtil.rightChild_fIx(insertSlot, frames,
-						convertBuffer);
-				int rc_off = BSTNodeUtil.rightChild_offset(insertSlot, frames,
-						convertBuffer);
-				if (isNodeNull(rc_fix, rc_off)) {
-					initNewNode(fix, off);
-					BSTNodeUtil.setRightChild(insertSlot.getFrameIx(),
-							insertSlot.getOffset(), fix, off, frames);
-					if (_debug_depth_counter > _debug_max_depth) {
-						_debug_max_depth = _debug_depth_counter;
-					}
-					return;
-				} else {
-					insertSlot.set(rc_fix, rc_off);
-				}
-			}
-		}
-		throw new HyracksDataException(
-				"Failure in node insertion into BST in BSTMemMgr");
-	}
+        if (root.isNull()) {
+            root.set(fix, off);
+            return;
+        }
 
-	/**
-	 * @param length
-	 * @param target
-	 *            is the container sent by the caller to hold the results
-	 */
-	private void search(int length, Slot[] target) {
-		clear(target);
-		result.clear();
+        insertSlot.clear();
+        insertSlot.copy(root);
+        while (!insertSlot.isNull()) {
+            debugDepthCounter++;
+            int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
+            if (curSlotLen == length) {
+                append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
+                if (debugDepthCounter > debugMaxDepth) {
+                    debugMaxDepth = debugDepthCounter;
+                }
+                return;
+            }
+            if (length < curSlotLen) {
+                int leftChildFIx = BSTNodeUtil.getLeftChildFrameIx(insertSlot, frames, convertBuffer);
+                int leftChildOffset = BSTNodeUtil.getLeftChildOffset(insertSlot, frames, convertBuffer);
+                if (isNodeNull(leftChildFIx, leftChildOffset)) {
+                    initNewNode(fix, off);
+                    BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
+                    if (debugDepthCounter > debugMaxDepth) {
+                        debugMaxDepth = debugDepthCounter;
+                    }
+                    return;
+                } else {
+                    insertSlot.set(leftChildFIx, leftChildOffset);
+                }
+            } else {
+                int rightChildFIx = BSTNodeUtil.getRightChildFrameIx(insertSlot, frames, convertBuffer);
+                int rightChildOffset = BSTNodeUtil.getRightChildOffset(insertSlot, frames, convertBuffer);
+                if (isNodeNull(rightChildFIx, rightChildOffset)) {
+                    initNewNode(fix, off);
+                    BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
+                    if (debugDepthCounter > debugMaxDepth) {
+                        debugMaxDepth = debugDepthCounter;
+                    }
+                    return;
+                } else {
+                    insertSlot.set(rightChildFIx, rightChildOffset);
+                }
+            }
+        }
+        throw new HyracksDataException("Failure in node insertion into BST in BSTMemMgr");
+    }
 
-		if (root.isNull()) {
-			return;
-		}
+    /**
+     * @param length
+     * @param target
+     *            is the container sent by the caller to hold the results
+     */
+    private void search(int length, Slot[] target) {
+        clear(target);
+        result.clear();
 
-		Slot lastLeftParent = new Slot();
-		Slot lastLeft = new Slot();
-		Slot parent = new Slot();
-		result.copy(root);
+        if (root.isNull()) {
+            return;
+        }
 
-		while (!result.isNull()) {
-			_debug_total_lookup_steps++;
-			if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
-				target[0].copy(parent);
-				target[1].copy(result);
-				return;
-			}
-			if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
-				lastLeftParent.copy(parent);
-				lastLeft.copy(result);
-				parent.copy(result);
-				int fix = BSTNodeUtil.leftChild_fIx(result, frames,
-						convertBuffer);
-				int off = BSTNodeUtil.leftChild_offset(result, frames,
-						convertBuffer);
-				result.set(fix, off);
-			} else {
-				parent.copy(result);
-				int fix = BSTNodeUtil.rightChild_fIx(result, frames,
-						convertBuffer);
-				int off = BSTNodeUtil.rightChild_offset(result, frames,
-						convertBuffer);
-				result.set(fix, off);
-			}
-		}
+        Slot lastLeftParent = new Slot();
+        Slot lastLeft = new Slot();
+        Slot parent = new Slot();
+        result.copy(root);
 
-		target[0].copy(lastLeftParent);
-		target[1].copy(lastLeft);
+        while (!result.isNull()) {
+            debugTotalLookupSteps++;
+            if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
+                target[0].copy(parent);
+                target[1].copy(result);
+                return;
+            }
+            if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
+                lastLeftParent.copy(parent);
+                lastLeft.copy(result);
+                parent.copy(result);
+                int fix = BSTNodeUtil.getLeftChildFrameIx(result, frames, convertBuffer);
+                int off = BSTNodeUtil.getLeftChildOffset(result, frames, convertBuffer);
+                result.set(fix, off);
+            } else {
+                parent.copy(result);
+                int fix = BSTNodeUtil.getRightChildFrameIx(result, frames, convertBuffer);
+                int off = BSTNodeUtil.getRightChildOffset(result, frames, convertBuffer);
+                result.set(fix, off);
+            }
+        }
 
-	}
+        target[0].copy(lastLeftParent);
+        target[1].copy(lastLeft);
 
-	private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
-		initNewNode(nodeFix, nodeOff);
+    }
 
-		int fix = BSTNodeUtil.next_fIx(headFix, headOff, frames, convertBuffer); // frameIx
-																					// for
-																					// the
-																					// current
-																					// next
-																					// of
-																					// head
-		int off = BSTNodeUtil.next_offset(headFix, headOff, frames,
-				convertBuffer); // offset for the current next of head
-		BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
+    private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
+        initNewNode(nodeFix, nodeOff);
 
-		if (!isNodeNull(fix, off)) {
-			BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
-		}
-		BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
-		BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
-	}
+        int fix = BSTNodeUtil.getNextFrameIx(headFix, headOff, frames, convertBuffer); // frameIx
+        // for
+        // the
+        // current
+        // next
+        // of
+        // head
+        int off = BSTNodeUtil.getNextOffset(headFix, headOff, frames, convertBuffer); // offset
+                                                                                      // for
+                                                                                      // the
+                                                                                      // current
+                                                                                      // next
+                                                                                      // of
+                                                                                      // head
+        BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
 
-	private int[] split(Slot listHead, Slot parent, int length) {
-		int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer)
-				- length - 2 * BSTNodeUtil.HEADER_SIZE;
-		// We split the node after slots-list head
-		if (!isNodeNull(BSTNodeUtil.next_fIx(listHead, frames, convertBuffer),
-				BSTNodeUtil.next_offset(listHead, frames, convertBuffer))) {
-			int afterHeadFix = BSTNodeUtil.next_fIx(listHead, frames,
-					convertBuffer);
-			int afterHeadOff = BSTNodeUtil.next_offset(listHead, frames,
-					convertBuffer);
-			int afHNextFix = BSTNodeUtil.next_fIx(afterHeadFix, afterHeadOff,
-					frames, convertBuffer);
-			int afHNextOff = BSTNodeUtil.next_offset(afterHeadFix,
-					afterHeadOff, frames, convertBuffer);
-			BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(),
-					afHNextFix, afHNextOff, frames);
-			if (!isNodeNull(afHNextFix, afHNextOff)) {
-				BSTNodeUtil.setPrev(afHNextFix, afHNextOff,
-						listHead.getFrameIx(), listHead.getOffset(), frames);
-			}
-			int secondOffset = afterHeadOff + length + 2
-					* BSTNodeUtil.HEADER_SIZE;
-			BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length,
-					true, frames);
-			BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true,
-					frames);
+        if (!isNodeNull(fix, off)) {
+            BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
+        }
+        BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
+        BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
+    }
 
-			return new int[] { afterHeadFix, afterHeadOff, afterHeadFix,
-					secondOffset };
-		}
-		// We split the head
-		int secondOffset = listHead.getOffset() + length + 2
-				* BSTNodeUtil.HEADER_SIZE;
-		BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(),
-				listHead.getOffset(), length, true, frames);
-		BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2,
-				true, frames);
+    private int[] split(Slot listHead, Slot parent, int length) {
+        int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer) - length - 2 * BSTNodeUtil.HEADER_SIZE;
+        // We split the node after slots-list head
+        if (!isNodeNull(BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer),
+                BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer))) {
+            int afterHeadFix = BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer);
+            int afterHeadOff = BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer);
+            int afHNextFix = BSTNodeUtil.getNextFrameIx(afterHeadFix, afterHeadOff, frames, convertBuffer);
+            int afHNextOff = BSTNodeUtil.getNextOffset(afterHeadFix, afterHeadOff, frames, convertBuffer);
+            BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(), afHNextFix, afHNextOff, frames);
+            if (!isNodeNull(afHNextFix, afHNextOff)) {
+                BSTNodeUtil.setPrev(afHNextFix, afHNextOff, listHead.getFrameIx(), listHead.getOffset(), frames);
+            }
+            int secondOffset = afterHeadOff + length + 2 * BSTNodeUtil.HEADER_SIZE;
+            BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length, true, frames);
+            BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true, frames);
 
-		fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(),
-				parent.getFrameIx(), parent.getOffset());
-		return new int[] { listHead.getFrameIx(), listHead.getOffset(),
-				listHead.getFrameIx(), secondOffset };
-	}
+            return new int[] { afterHeadFix, afterHeadOff, afterHeadFix, secondOffset };
+        }
+        // We split the head
+        int secondOffset = listHead.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE;
+        BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), listHead.getOffset(), length, true, frames);
+        BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2, true, frames);
 
-	private void fixTreePtrs(int node_fix, int node_off, int par_fix,
-			int par_off) {
-		int nlc_fix = BSTNodeUtil.leftChild_fIx(node_fix, node_off, frames,
-				convertBuffer);
-		int nlc_off = BSTNodeUtil.leftChild_offset(node_fix, node_off, frames,
-				convertBuffer);
-		int nrc_fix = BSTNodeUtil.rightChild_fIx(node_fix, node_off, frames,
-				convertBuffer);
-		int nrc_off = BSTNodeUtil.rightChild_offset(node_fix, node_off, frames,
-				convertBuffer);
+        fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(), parent.getFrameIx(), parent.getOffset());
+        return new int[] { listHead.getFrameIx(), listHead.getOffset(), listHead.getFrameIx(), secondOffset };
+    }
 
-		int status = -1; // (status==0 if node is left child of parent)
-							// (status==1 if node is right child of parent)
-		if (!isNodeNull(par_fix, par_off)) {
-			int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
-					node_fix, node_off, frames, convertBuffer));
-			int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
-					par_fix, par_off, frames, convertBuffer));
-			status = ((nlen < plen) ? 0 : 1);
-		}
+    private void fixTreePtrs(int nodeFrameIx, int nodeOffset, int parentFrameIx, int parentOffset) {
+        int nodeLeftChildFrameIx = BSTNodeUtil.getLeftChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
+        int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
+        int nodeRightChildFrameIx = BSTNodeUtil.getRightChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
+        int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
 
-		if (!isNodeNull(nlc_fix, nlc_off) && !isNodeNull(nrc_fix, nrc_off)) { // Node
-																				// has
-																				// two
-																				// children
-			int pmin_fix = node_fix;
-			int pmin_off = node_off;
-			int min_fix = nrc_fix;
-			int min_off = nrc_off;
-			int next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
-					frames, convertBuffer);
-			int next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
-					frames, convertBuffer);
+        int status = -1; // (status==0 if node is left child of parent)
+                         // (status==1 if node is right child of parent)
+        if (!isNodeNull(parentFrameIx, parentOffset)) {
+            int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil
+                    .getLength(nodeFrameIx, nodeOffset, frames, convertBuffer));
+            int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(parentFrameIx, parentOffset, frames,
+                    convertBuffer));
+            status = ((nlen < plen) ? 0 : 1);
+        }
 
-			while (!isNodeNull(next_left_fix, next_left_off)) {
-				pmin_fix = min_fix;
-				pmin_off = min_off;
-				min_fix = next_left_fix;
-				min_off = next_left_off;
-				next_left_fix = BSTNodeUtil.leftChild_fIx(min_fix, min_off,
-						frames, convertBuffer); // min is now pointing to
-												// current (old) next left
-				next_left_off = BSTNodeUtil.leftChild_offset(min_fix, min_off,
-						frames, convertBuffer); // min is now pointing to
-												// current (old) next left
-			}
+        if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)
+                && !isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
+            // has
+            // two
+            // children
+            int pMinFIx = nodeFrameIx;
+            int pMinOff = nodeOffset;
+            int minFIx = nodeRightChildFrameIx;
+            int minOff = nodeRightChildOffset;
+            int nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer);
+            int nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer);
 
-			if ((nrc_fix == min_fix) && (nrc_off == min_off)) { // nrc is the
-																// same as min
-				BSTNodeUtil.setLeftChild(nrc_fix, nrc_off, nlc_fix, nlc_off,
-						frames);
-			} else { // min is different from nrc
-				int min_right_fix = BSTNodeUtil.rightChild_fIx(min_fix,
-						min_off, frames, convertBuffer);
-				int min_right_off = BSTNodeUtil.rightChild_offset(min_fix,
-						min_off, frames, convertBuffer);
-				BSTNodeUtil.setRightChild(min_fix, min_off, nrc_fix, nrc_off,
-						frames);
-				BSTNodeUtil.setLeftChild(min_fix, min_off, nlc_fix, nlc_off,
-						frames);
-				BSTNodeUtil.setLeftChild(pmin_fix, pmin_off, min_right_fix,
-						min_right_off, frames);
-			}
+            while (!isNodeNull(nextLeftFIx, nextLeftOff)) {
+                pMinFIx = minFIx;
+                pMinOff = minOff;
+                minFIx = nextLeftFIx;
+                minOff = nextLeftOff;
+                nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer); // min
+                                                                                                      // is
+                                                                                                      // now
+                                                                                                      // pointing
+                                                                                                      // to
+                                                                                                      // current
+                                                                                                      // (old)
+                                                                                                      // next
+                                                                                                      // left
+                nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer); // min
+                                                                                                     // is
+                                                                                                     // now
+                                                                                                     // pointing
+                                                                                                     // to
+                                                                                                     // current
+                                                                                                     // (old)
+                                                                                                     // next
+                                                                                                     // left
+            }
 
-			// Now dealing with the parent
-			if (!isNodeNull(par_fix, par_off)) {
-				if (status == 0) {
-					BSTNodeUtil.setLeftChild(par_fix, par_off, min_fix,
-							min_off, frames);
-				} else if (status == 1) {
-					BSTNodeUtil.setRightChild(par_fix, par_off, min_fix,
-							min_off, frames);
-				}
-			} else { // No parent (node was the root)
-				root.set(min_fix, min_off);
-			}
-			return;
-		}
+            if ((nodeRightChildFrameIx == minFIx) && (nodeRightChildOffset == minOff)) { // nrc
+                                                                                         // is
+                                                                                         // the
+                // same as min
+                BSTNodeUtil.setLeftChild(nodeRightChildFrameIx, nodeRightChildOffset, nodeLeftChildFrameIx,
+                        nodeLeftChildOffset, frames);
+            } else { // min is different from nrc
+                int minRightFIx = BSTNodeUtil.getRightChildFrameIx(minFIx, minOff, frames, convertBuffer);
+                int minRightOffset = BSTNodeUtil.getRightChildOffset(minFIx, minOff, frames, convertBuffer);
+                BSTNodeUtil.setRightChild(minFIx, minOff, nodeRightChildFrameIx, nodeRightChildOffset, frames);
+                BSTNodeUtil.setLeftChild(minFIx, minOff, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
+                BSTNodeUtil.setLeftChild(pMinFIx, pMinOff, minRightFIx, minRightOffset, frames);
+            }
 
-		else if (!isNodeNull(nlc_fix, nlc_off)) { // Node has only left child
-			if (status == 0) {
-				BSTNodeUtil.setLeftChild(par_fix, par_off, nlc_fix, nlc_off,
-						frames);
-			} else if (status == 1) {
-				BSTNodeUtil.setRightChild(par_fix, par_off, nlc_fix, nlc_off,
-						frames);
-			} else if (status == -1) { // No parent, so node is root
-				root.set(nlc_fix, nlc_off);
-			}
-			return;
-		}
+            // Now dealing with the parent
+            if (!isNodeNull(parentFrameIx, parentOffset)) {
+                if (status == 0) {
+                    BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
+                } else if (status == 1) {
+                    BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
+                }
+            } else { // No parent (node was the root)
+                root.set(minFIx, minOff);
+            }
+            return;
+        }
 
-		else if (!isNodeNull(nrc_fix, nrc_off)) { // Node has only right child
-			if (status == 0) {
-				BSTNodeUtil.setLeftChild(par_fix, par_off, nrc_fix, nrc_off,
-						frames);
-			} else if (status == 1) {
-				BSTNodeUtil.setRightChild(par_fix, par_off, nrc_fix, nrc_off,
-						frames);
-			} else if (status == -1) { // No parent, so node is root
-				root.set(nrc_fix, nrc_off);
-			}
-			return;
-		}
+        else if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)) { // Node
+                                                                           // has
+                                                                           // only
+                                                                           // left
+                                                                           // child
+            if (status == 0) {
+                BSTNodeUtil
+                        .setLeftChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
+            } else if (status == 1) {
+                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset,
+                        frames);
+            } else if (status == -1) { // No parent, so node is root
+                root.set(nodeLeftChildFrameIx, nodeLeftChildOffset);
+            }
+            return;
+        }
 
-		else { // Node is leaf (no children)
-			if (status == 0) {
-				BSTNodeUtil.setLeftChild(par_fix, par_off,
-						BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
-						frames);
-			} else if (status == 1) {
-				BSTNodeUtil.setRightChild(par_fix, par_off,
-						BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX,
-						frames);
-			} else { // node was the only node in the tree
-				root.clear();
-			}
-			return;
-		}
-	}
+        else if (!isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
+                                                                             // has
+                                                                             // only
+                                                                             // right
+                                                                             // child
+            if (status == 0) {
+                BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
+                        frames);
+            } else if (status == 1) {
+                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
+                        frames);
+            } else if (status == -1) { // No parent, so node is root
+                root.set(nodeRightChildFrameIx, nodeRightChildOffset);
+            }
+            return;
+        }
 
-	/**
-	 * Allocation with no splitting but padding
-	 * 
-	 * @param node
-	 * @param parent
-	 * @param result
-	 *            is the container sent by the caller to hold the results
-	 */
-	private void allocate(Slot node, Slot parent, int length, Slot result) {
-		int nextFix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
-		int nextOff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
-		if (!isNodeNull(nextFix, nextOff)) {
-			int nextOfNext_fix = BSTNodeUtil.next_fIx(nextFix, nextOff, frames,
-					convertBuffer);
-			int nextOfNext_off = BSTNodeUtil.next_offset(nextFix, nextOff,
-					frames, convertBuffer);
-			BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(),
-					nextOfNext_fix, nextOfNext_off, frames);
-			if (!isNodeNull(nextOfNext_fix, nextOfNext_off)) {
-				BSTNodeUtil.setPrev(nextOfNext_fix, nextOfNext_off,
-						node.getFrameIx(), node.getOffset(), frames);
-			}
-			BSTNodeUtil
-					.setHeaderFooter(nextFix, nextOff, length, false, frames);
-			result.set(nextFix, nextOff);
-			return;
-		}
+        else { // Node is leaf (no children)
+            if (status == 0) {
+                BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
+                        BSTNodeUtil.INVALID_INDEX, frames);
+            } else if (status == 1) {
+                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
+                        BSTNodeUtil.INVALID_INDEX, frames);
+            } else { // node was the only node in the tree
+                root.clear();
+            }
+            return;
+        }
+    }
 
-		fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(),
-				parent.getOffset());
-		BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(),
-				length, false, frames);
-		result.copy(node);
-	}
+    /**
+     * Allocation with no splitting but padding
+     * 
+     * @param node
+     * @param parent
+     * @param result
+     *            is the container sent by the caller to hold the results
+     */
+    private void allocate(Slot node, Slot parent, int length, Slot result) {
+        int nextFix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
+        int nextOff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
+        if (!isNodeNull(nextFix, nextOff)) {
+            int nextOfNextFIx = BSTNodeUtil.getNextFrameIx(nextFix, nextOff, frames, convertBuffer);
+            int nextOfNextOffset = BSTNodeUtil.getNextOffset(nextFix, nextOff, frames, convertBuffer);
+            BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(), nextOfNextFIx, nextOfNextOffset, frames);
+            if (!isNodeNull(nextOfNextFIx, nextOfNextOffset)) {
+                BSTNodeUtil.setPrev(nextOfNextFIx, nextOfNextOffset, node.getFrameIx(), node.getOffset(), frames);
+            }
+            BSTNodeUtil.setHeaderFooter(nextFix, nextOff, length, false, frames);
+            result.set(nextFix, nextOff);
+            return;
+        }
 
-	private void removeFromList(int fix, int off) {
-		int nx_fix = BSTNodeUtil.next_fIx(fix, off, frames, convertBuffer);
-		int nx_off = BSTNodeUtil.next_offset(fix, off, frames, convertBuffer);
-		int prev_fix = BSTNodeUtil.prev_fIx(fix, off, frames, convertBuffer);
-		int prev_off = BSTNodeUtil.prev_offset(fix, off, frames, convertBuffer);
-		if (!isNodeNull(prev_fix, prev_off) && !isNodeNull(nx_fix, nx_off)) {
-			BSTNodeUtil.setNext(prev_fix, prev_off, nx_fix, nx_off, frames);
-			BSTNodeUtil.setPrev(nx_fix, nx_off, prev_fix, prev_off, frames);
-			BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX,
-					BSTNodeUtil.INVALID_INDEX, frames);
-			BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
-					BSTNodeUtil.INVALID_INDEX, frames);
-			return;
-		}
-		if (!isNodeNull(prev_fix, prev_off)) {
-			BSTNodeUtil.setNext(prev_fix, prev_off, BSTNodeUtil.INVALID_INDEX,
-					BSTNodeUtil.INVALID_INDEX, frames);
-			BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX,
-					BSTNodeUtil.INVALID_INDEX, frames);
-			return;
-		}
+        fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(), parent.getOffset());
+        BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(), length, false, frames);
+        result.copy(node);
+    }
 
-		// We need to find the parent, so we can fix the tree
-		int par_fix = BSTNodeUtil.INVALID_INDEX;
-		int par_off = BSTNodeUtil.INVALID_INDEX;
-		int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix,
-				off, frames, convertBuffer));
-		fix = root.getFrameIx();
-		off = root.getOffset();
-		int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-		while (length != curLen) {
-			par_fix = fix;
-			par_off = off;
-			if (length < curLen) {
-				fix = BSTNodeUtil.leftChild_fIx(par_fix, par_off, frames,
-						convertBuffer); // par_fix is now the old(current) fix
-				off = BSTNodeUtil.leftChild_offset(par_fix, par_off, frames,
-						convertBuffer); // par_off is now the old(current) off
-			} else {
-				fix = BSTNodeUtil.rightChild_fIx(par_fix, par_off, frames,
-						convertBuffer); // par_fix is now the old(current) fix
-				off = BSTNodeUtil.rightChild_offset(par_fix, par_off, frames,
-						convertBuffer); // par_off is now the old(current) off
-			}
-			curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-		}
+    private void removeFromList(int fix, int off) {
+        int nextFIx = BSTNodeUtil.getNextFrameIx(fix, off, frames, convertBuffer);
+        int nextOffset = BSTNodeUtil.getNextOffset(fix, off, frames, convertBuffer);
+        int prevFIx = BSTNodeUtil.getPrevFrameIx(fix, off, frames, convertBuffer);
+        int prevOffset = BSTNodeUtil.getPrevOffset(fix, off, frames, convertBuffer);
+        if (!isNodeNull(prevFIx, prevOffset) && !isNodeNull(nextFIx, nextOffset)) {
+            BSTNodeUtil.setNext(prevFIx, prevOffset, nextFIx, nextOffset, frames);
+            BSTNodeUtil.setPrev(nextFIx, nextOffset, prevFIx, prevOffset, frames);
+            BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+            BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+            return;
+        }
+        if (!isNodeNull(prevFIx, prevOffset)) {
+            BSTNodeUtil.setNext(prevFIx, prevOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+            BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+            return;
+        }
 
-		if (!isNodeNull(nx_fix, nx_off)) { // it is head of the list (in the
-											// tree)
-			BSTNodeUtil.setPrev(nx_fix, nx_off, BSTNodeUtil.INVALID_INDEX,
-					BSTNodeUtil.INVALID_INDEX, frames);
-			int node_lc_fix = BSTNodeUtil.leftChild_fIx(fix, off, frames,
-					convertBuffer);
-			int node_lc_off = BSTNodeUtil.leftChild_offset(fix, off, frames,
-					convertBuffer);
-			int node_rc_fix = BSTNodeUtil.rightChild_fIx(fix, off, frames,
-					convertBuffer);
-			int node_rc_off = BSTNodeUtil.rightChild_offset(fix, off, frames,
-					convertBuffer);
-			BSTNodeUtil.setLeftChild(nx_fix, nx_off, node_lc_fix, node_lc_off,
-					frames);
-			BSTNodeUtil.setRightChild(nx_fix, nx_off, node_rc_fix, node_rc_off,
-					frames);
-			if (!isNodeNull(par_fix, par_off)) {
-				int parentLength = BSTNodeUtil.getLength(par_fix, par_off,
-						frames, convertBuffer);
-				if (length < parentLength) {
-					BSTNodeUtil.setLeftChild(par_fix, par_off, nx_fix, nx_off,
-							frames);
-				} else {
-					BSTNodeUtil.setRightChild(par_fix, par_off, nx_fix, nx_off,
-							frames);
-				}
-			}
+        // We need to find the parent, so we can fix the tree
+        int parentFIx = BSTNodeUtil.INVALID_INDEX;
+        int parentOffset = BSTNodeUtil.INVALID_INDEX;
+        int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix, off, frames, convertBuffer));
+        fix = root.getFrameIx();
+        off = root.getOffset();
+        int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+        while (length != curLen) {
+            parentFIx = fix;
+            parentOffset = off;
+            if (length < curLen) {
+                fix = BSTNodeUtil.getLeftChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
+                // is
+                // now
+                // the
+                // old(current)
+                // fix
+                off = BSTNodeUtil.getLeftChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
+                // is
+                // now
+                // the
+                // old(current)
+                // off
+            } else {
+                fix = BSTNodeUtil.getRightChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
+                // is
+                // now
+                // the
+                // old(current)
+                // fix
+                off = BSTNodeUtil.getRightChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
+                // is
+                // now
+                // the
+                // old(current)
+                // off
+            }
+            curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+        }
 
-			if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
-				root.set(nx_fix, nx_off);
-			}
+        if (!isNodeNull(nextFIx, nextOffset)) { // it is head of the list (in
+                                                // the
+            // tree)
+            BSTNodeUtil.setPrev(nextFIx, nextOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+            int nodeLeftChildFIx = BSTNodeUtil.getLeftChildFrameIx(fix, off, frames, convertBuffer);
+            int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(fix, off, frames, convertBuffer);
+            int nodeRightChildFix = BSTNodeUtil.getRightChildFrameIx(fix, off, frames, convertBuffer);
+            int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(fix, off, frames, convertBuffer);
+            BSTNodeUtil.setLeftChild(nextFIx, nextOffset, nodeLeftChildFIx, nodeLeftChildOffset, frames);
+            BSTNodeUtil.setRightChild(nextFIx, nextOffset, nodeRightChildFix, nodeRightChildOffset, frames);
+            if (!isNodeNull(parentFIx, parentOffset)) {
+                int parentLength = BSTNodeUtil.getLength(parentFIx, parentOffset, frames, convertBuffer);
+                if (length < parentLength) {
+                    BSTNodeUtil.setLeftChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
+                } else {
+                    BSTNodeUtil.setRightChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
+                }
+            }
 
-			return;
-		}
+            if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
+                root.set(nextFIx, nextOffset);
+            }
 
-		fixTreePtrs(fix, off, par_fix, par_off);
-	}
+            return;
+        }
 
-	private void clear(Slot[] s) {
-		s[0].clear();
-		s[1].clear();
-	}
+        fixTreePtrs(fix, off, parentFIx, parentOffset);
+    }
 
-	private boolean isNodeNull(int frameIx, int offset) {
-		return ((frameIx == BSTNodeUtil.INVALID_INDEX)
-				|| (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
-	}
+    private void clear(Slot[] s) {
+        s[0].clear();
+        s[1].clear();
+    }
 
-	private boolean shouldSplit(int slotLength, int reqLength) {
-		return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
-	}
+    private boolean isNodeNull(int frameIx, int offset) {
+        return ((frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
+    }
 
-	private void initNewNode(int frameIx, int offset) {
-		BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
-				BSTNodeUtil.INVALID_INDEX, frames);
-		BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
-				BSTNodeUtil.INVALID_INDEX, frames);
-		BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
-				BSTNodeUtil.INVALID_INDEX, frames);
-		BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX,
-				BSTNodeUtil.INVALID_INDEX, frames);
-	}
+    private boolean shouldSplit(int slotLength, int reqLength) {
+        return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
+    }
 
-	private ByteBuffer allocateFrame() {
-		return ctx.allocateFrame();
-	}
+    private void initNewNode(int frameIx, int offset) {
+        BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+        BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+        BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+        BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
+    }
 
-	public String _debug_printMemory() {
-		_debug_free_slots = 0;
-		Slot s = new Slot(0, 0);
-		if (s.isNull()) {
-			return "memory:\tNull";
-		}
+    private ByteBuffer allocateFrame() {
+        return ctx.allocateFrame();
+    }
 
-		if (BSTNodeUtil.isFree(0, 0, frames)) {
-			_debug_free_slots++;
-		}
+    public String debugPrintMemory() {
+        debugFreeSlots = 0;
+        Slot s = new Slot(0, 0);
+        if (s.isNull()) {
+            return "memory:\tNull";
+        }
 
-		String m = "memory:\n" + _debug_printSlot(0, 0) + "\n";
-		int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0,
-				frames, convertBuffer));
-		int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
-				: length + 2 * BSTNodeUtil.HEADER_SIZE);
-		int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX
-				: 1)
-				: 0);
-		if (noff == BSTNodeUtil.INVALID_INDEX
-				&& nfix != BSTNodeUtil.INVALID_INDEX) {
-			noff = 0;
-		}
-		s.set(nfix, noff);
-		while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
-			if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
-				_debug_free_slots++;
-			}
-			m += _debug_printSlot(s.getFrameIx(), s.getOffset()) + "\n";
-			length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(
-					s.getFrameIx(), s.getOffset(), frames, convertBuffer));
-			noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= FRAME_SIZE ? BSTNodeUtil.INVALID_INDEX
-					: s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
-			nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s
-					.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
-					: s.getFrameIx() + 1) : s.getFrameIx());
-			if (noff == BSTNodeUtil.INVALID_INDEX
-					&& nfix != BSTNodeUtil.INVALID_INDEX) {
-				noff = 0;
-			}
-			s.set(nfix, noff);
-		}
-		return m + "\nFree Slots:\t" + _debug_free_slots;
-	}
+        if (BSTNodeUtil.isFree(0, 0, frames)) {
+            debugFreeSlots++;
+        }
 
-	public String _debug_printTree() {
-		_debug_tree_size = 0;
-		Slot node = new Slot();
-		node.copy(root);
-		if (!node.isNull()) {
-			_debug_tree_size++;
-			return _debug_printSubTree(node) + "\nTree Nodes:\t"
-					+ _debug_tree_size;
-		}
-		return "Null";
-	}
+        String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
+        int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
+        int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
+                * BSTNodeUtil.HEADER_SIZE);
+        int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX : 1) : 0);
+        if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
+            noff = 0;
+        }
+        s.set(nfix, noff);
+        while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
+            if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
+                debugFreeSlots++;
+            }
+            m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
+            length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
+                    convertBuffer));
+            noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : s
+                    .getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
+            nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
+                    : s.getFrameIx() + 1)
+                    : s.getFrameIx());
+            if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
+                noff = 0;
+            }
+            s.set(nfix, noff);
+        }
+        return m + "\nFree Slots:\t" + debugFreeSlots;
+    }
 
-	private String _debug_printSubTree(Slot r) {
-		Slot node = new Slot();
-		node.copy(r);
-		int fix = node.getFrameIx();
-		int off = node.getOffset();
-		int lfix = BSTNodeUtil.leftChild_fIx(node, frames, convertBuffer);
-		int loff = BSTNodeUtil.leftChild_offset(node, frames, convertBuffer);
-		int rfix = BSTNodeUtil.rightChild_fIx(node, frames, convertBuffer);
-		int roff = BSTNodeUtil.rightChild_offset(node, frames, convertBuffer);
-		int nfix = BSTNodeUtil.next_fIx(node, frames, convertBuffer);
-		int noff = BSTNodeUtil.next_offset(node, frames, convertBuffer);
-		int pfix = BSTNodeUtil.prev_fIx(node, frames, convertBuffer);
-		int poff = BSTNodeUtil.prev_offset(node, frames, convertBuffer);
+    public String debugPrintTree() {
+        debugTreeSize = 0;
+        Slot node = new Slot();
+        node.copy(root);
+        if (!node.isNull()) {
+            debugTreeSize++;
+            return debugPrintSubTree(node) + "\nTree Nodes:\t" + debugTreeSize;
+        }
+        return "Null";
+    }
 
-		String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
-				+ BSTNodeUtil.getLength(fix, off, frames, convertBuffer)
-				+ ") - " + "(LC: " + _debug_printSlot(lfix, loff) + ") - "
-				+ "(RC: " + _debug_printSlot(rfix, roff) + ") - " + "(NX: "
-				+ _debug_printSlot(nfix, noff) + ") - " + "(PR: "
-				+ _debug_printSlot(pfix, poff) + ")  }\n";
-		if (!isNodeNull(lfix, loff)) {
-			_debug_tree_size++;
-			s += _debug_printSubTree(new Slot(lfix, loff)) + "\n";
-		}
-		if (!isNodeNull(rfix, roff)) {
-			_debug_tree_size++;
-			s += _debug_printSubTree(new Slot(rfix, roff)) + "\n";
-		}
+    private String debugPrintSubTree(Slot r) {
+        Slot node = new Slot();
+        node.copy(r);
+        int fix = node.getFrameIx();
+        int off = node.getOffset();
+        int lfix = BSTNodeUtil.getLeftChildFrameIx(node, frames, convertBuffer);
+        int loff = BSTNodeUtil.getLeftChildOffset(node, frames, convertBuffer);
+        int rfix = BSTNodeUtil.getRightChildFrameIx(node, frames, convertBuffer);
+        int roff = BSTNodeUtil.getRightChildOffset(node, frames, convertBuffer);
+        int nfix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
+        int noff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
+        int pfix = BSTNodeUtil.getPrevFrameIx(node, frames, convertBuffer);
+        int poff = BSTNodeUtil.getPrevOffset(node, frames, convertBuffer);
 
-		return s;
-	}
+        String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
+                + BSTNodeUtil.getLength(fix, off, frames, convertBuffer) + ") - " + "(LC: "
+                + debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
+                + debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ")  }\n";
+        if (!isNodeNull(lfix, loff)) {
+            debugTreeSize++;
+            s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
+        }
+        if (!isNodeNull(rfix, roff)) {
+            debugTreeSize++;
+            s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
+        }
 
-	private String _debug_printSlot(int fix, int off) {
-		if (isNodeNull(fix, off)) {
-			return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
-		}
-		int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-		int al = BSTNodeUtil.getActualLength(l);
-		boolean f = BSTNodeUtil.isFree(fix, off, frames);
-		return fix + ", " + off + " (free: " + f + ") (Len: " + l
-				+ ") (actual len: " + al + ") ";
-	}
+        return s;
+    }
+
+    private String debugPrintSlot(int fix, int off) {
+        if (isNodeNull(fix, off)) {
+            return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
+        }
+        int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
+        int al = BSTNodeUtil.getActualLength(l);
+        boolean f = BSTNodeUtil.isFree(fix, off, frames);
+        return fix + ", " + off + " (free: " + f + ") (Len: " + l + ") (actual len: " + al + ") ";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
index dc1bffa..0294dc0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -8,270 +8,214 @@
  *         Implements utility methods, used extensively and repeatedly within
  *         the BSTMemMgr.
  * 
- *         Main functionality includes methods to set/get different types of
- *         pointers, required and accessed within BST traversal, along with the
- *         methods for setting/getting length/header/footer of free slots, which
- *         have been used as the containers for BST nodes.
+ *         Mainly includes methods to set/get different types of pointers,
+ *         required and accessed within BST traversal, along with the methods
+ *         for setting/getting length/header/footer of free slots, which have
+ *         been used as the containers for BST nodes.
  */
 public class BSTNodeUtil {
 
-	static final int TOTAL_FRAME_SIZE = BSTMemMgr.FRAME_SIZE;
-	static final int MINIMUM_FREE_SLOT_SIZE = 32;
+    static final int MINIMUM_FREE_SLOT_SIZE = 32;
 
-	static final int FRAME_PTR_SIZE = 4;
-	static final int OFFSET_SIZE = 2;
+    private static final int FRAME_PTR_SIZE = 4;
+    private static final int OFFSET_SIZE = 2;
 
-	static final int HEADER_SIZE = 2;
-	static final int HEADER_INDEX = 0;
+    static final int HEADER_SIZE = 2;
+    private static final int HEADER_INDEX = 0;
 
-	static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
-	static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX
-			+ FRAME_PTR_SIZE;
+    private static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
+    private static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
 
-	static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX
-			+ OFFSET_SIZE;
-	static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX
-			+ FRAME_PTR_SIZE;
+    private static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+    private static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
 
-	static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
-	static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
+    private static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
+    private static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
 
-	static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
-	static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
+    private static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
+    private static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
 
-	private static final byte INVALID = -128;
-	private static final byte MASK = 127;
-	static final int INVALID_INDEX = -1;
+    private static final byte INVALID = -128;
+    private static final byte MASK = 127;
+    static final int INVALID_INDEX = -1;
 
-	/*
-	 * Structure of a free slot:
-	 * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
-	 * HEADER is set to 1 (invalid content)
-	 * 
-	 * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
-	 * 0 (valid content)
-	 */
+    /*
+     * Structure of a free slot:
+     * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
+     * HEADER is set to 1 in a free slot
+     * 
+     * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
+     * 0 in a used slot
+     */
 
-	static int leftChild_fIx(Slot s, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return leftChild_fIx(s.getFrameIx(), s.getOffset(), frames,
-				convertBuffer);
-	}
+    static int getLeftChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getLeftChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int leftChild_offset(Slot s, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return leftChild_offset(s.getFrameIx(), s.getOffset(), frames,
-				convertBuffer);
-	}
+    static int getLeftChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getLeftChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int leftChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX,
-				FRAME_PTR_SIZE, convertBuffer));
+    static int getLeftChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
 
-	}
+    }
 
-	static int leftChild_offset(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx],
-				offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-	}
+    static int getLeftChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+    }
 
-	static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
-		setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(),
-				lc.getOffset(), frames);
-	}
+    static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
+        setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(), lc.getOffset(), frames);
+    }
 
-	static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff,
-			ByteBuffer[] frames) {
-		storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX,
-				FRAME_PTR_SIZE, lcFix);
-		storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX,
-				OFFSET_SIZE, lcOff);
-	}
+    static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff, ByteBuffer[] frames) {
+        storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, lcFix);
+        storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, lcOff);
+    }
 
-	static int rightChild_fIx(Slot s, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return rightChild_fIx(s.getFrameIx(), s.getOffset(), frames,
-				convertBuffer);
-	}
+    static int getRightChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getRightChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int rightChild_offset(Slot s, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return rightChild_offset(s.getFrameIx(), s.getOffset(), frames,
-				convertBuffer);
-	}
+    static int getRightChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getRightChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int rightChild_fIx(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx],
-				offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-	}
+    static int getRightChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+    }
 
-	static int rightChild_offset(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset
-				+ RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-	}
+    static int getRightChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+    }
 
-	static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
-		setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(),
-				rc.getOffset(), frames);
-	}
+    static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
+        setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(), rc.getOffset(), frames);
+    }
 
-	static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff,
-			ByteBuffer[] frames) {
-		storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX,
-				FRAME_PTR_SIZE, rcFix);
-		storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX,
-				OFFSET_SIZE, rcOff);
-	}
+    static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff, ByteBuffer[] frames) {
+        storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, rcFix);
+        storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, rcOff);
+    }
 
-	static int next_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-		return next_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-	}
+    static int getNextFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getNextFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int next_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-		return next_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-	}
+    static int getNextOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getNextOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int next_fIx(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX,
-				FRAME_PTR_SIZE, convertBuffer));
-	}
+    static int getNextFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+    }
 
-	static int next_offset(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX,
-				OFFSET_SIZE, convertBuffer));
-	}
+    static int getNextOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+    }
 
-	static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
-		setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(),
-				node.getOffset(), frames);
-	}
+    static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
+        setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(), node.getOffset(), frames);
+    }
 
-	static void setNext(int nodeFix, int nodeOff, int nFix, int nOff,
-			ByteBuffer[] frames) {
-		storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE,
-				nFix);
-		storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE,
-				nOff);
-	}
+    static void setNext(int nodeFix, int nodeOff, int nFix, int nOff, ByteBuffer[] frames) {
+        storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, nFix);
+        storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE, nOff);
+    }
 
-	static int prev_fIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-		return prev_fIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-	}
+    static int getPrevFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getPrevFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int prev_offset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-		return prev_offset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-	}
+    static int getPrevOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getPrevOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int prev_fIx(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX,
-				FRAME_PTR_SIZE, convertBuffer));
-	}
+    static int getPrevFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
+    }
 
-	static int prev_offset(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX,
-				OFFSET_SIZE, convertBuffer));
-	}
+    static int getPrevOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
+    }
 
-	static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
-		setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(),
-				prev.getOffset(), frames);
-	}
+    static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
+        setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(), prev.getOffset(), frames);
+    }
 
-	static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff,
-			ByteBuffer[] frames) {
-		storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE,
-				pFix);
-		storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE,
-				pOff);
-	}
+    static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff, ByteBuffer[] frames) {
+        storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE, pFix);
+        storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE, pOff);
+    }
 
-	static boolean slotsTheSame(Slot s, Slot t) {
-		return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t
-				.getOffset()));
-	}
+    static boolean slotsTheSame(Slot s, Slot t) {
+        return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t.getOffset()));
+    }
 
-	static void setHeaderFooter(int frameIx, int offset, int usedLength,
-			boolean isFree, ByteBuffer[] frames) {
-		int slotLength = getActualLength(usedLength);
-		int footerOffset = offset + HEADER_SIZE + slotLength;
-		storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
-		storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
-		setFree(frameIx, offset, isFree, frames);
-		setFree(frameIx, footerOffset, isFree, frames);
-	}
+    static void setHeaderFooter(int frameIx, int offset, int usedLength, boolean isFree, ByteBuffer[] frames) {
+        int slotLength = getActualLength(usedLength);
+        int footerOffset = offset + HEADER_SIZE + slotLength;
+        storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
+        storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
+        setFree(frameIx, offset, isFree, frames);
+        setFree(frameIx, footerOffset, isFree, frames);
+    }
 
-	static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-		return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-	}
+    static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
+    }
 
-	static int getLength(int frameIx, int offset, ByteBuffer[] frames,
-			ByteBuffer convertBuffer) {
-		convertBuffer.clear();
-		for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
-			convertBuffer.put(i, (byte) 0x00);
-		}
+    static int getLength(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
+        convertBuffer.clear();
+        for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
+            convertBuffer.put(i, (byte) 0x00);
+        }
 
-		convertBuffer.put(4 - HEADER_SIZE,
-				(byte) ((frames[frameIx].get(offset)) & (MASK)));
-		System.arraycopy(frames[frameIx].array(), offset + 1,
-				convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
-		return convertBuffer.getInt(0);
-	}
+        convertBuffer.put(4 - HEADER_SIZE, (byte) ((frames[frameIx].get(offset)) & (MASK)));
+        System.arraycopy(frames[frameIx].array(), offset + 1, convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
+        return convertBuffer.getInt(0);
+    }
 
-	// MSB equal to 1 means FREE
-	static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
-		return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
-	}
+    // MSB equal to 1 means FREE
+    static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
+        return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
+    }
 
-	static void setFree(int frameIx, int offset, boolean free,
-			ByteBuffer[] frames) {
-		if (free) { // set MSB to 1 (for free)
-			frames[frameIx].put(offset,
-					(byte) (((frames[frameIx]).array()[offset]) | 0x80));
-		} else { // set MSB to 0 (for occupied)
-			frames[frameIx].put(offset,
-					(byte) (((frames[frameIx]).array()[offset]) & 0x7F));
-		}
-	}
+    static void setFree(int frameIx, int offset, boolean free, ByteBuffer[] frames) {
+        if (free) { // set MSB to 1 (for free)
+            frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) | 0x80));
+        } else { // set MSB to 0 (for used)
+            frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
+        }
+    }
 
-	static int getActualLength(int l) {
-		int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
-		return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
-	}
+    static int getActualLength(int l) {
+        int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
+        return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
+    }
 
-	private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size,
-			ByteBuffer convertBuffer) {
-		if ((b.get(fromIndex) & INVALID) == INVALID) {
-			return INVALID_INDEX;
-		}
+    private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size, ByteBuffer convertBuffer) {
+        if ((b.get(fromIndex) & INVALID) == INVALID) {
+            return INVALID_INDEX;
+        }
 
-		convertBuffer.clear();
-		for (int i = 0; i < 4 - size; i++) { // padding
-			convertBuffer.put(i, (byte) 0x00);
-		}
+        convertBuffer.clear();
+        for (int i = 0; i < 4 - size; i++) { // padding
+            convertBuffer.put(i, (byte) 0x00);
+        }
 
-		System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size,
-				size);
-		return convertBuffer.getInt(0);
-	}
+        System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size, size);
+        return convertBuffer.getInt(0);
+    }
 
-	private static void storeInt(ByteBuffer b, int fromIndex, int size,
-			int value) {
-		if (value == INVALID_INDEX) {
-			b.put(fromIndex, INVALID);
-			return;
-		}
-		for (int i = 0; i < size; i++) {
-			b.put(fromIndex + i,
-					(byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
-		}
-	}
+    private static void storeInt(ByteBuffer b, int fromIndex, int size, int value) {
+        if (value == INVALID_INDEX) {
+            b.put(fromIndex, INVALID);
+            return;
+        }
+        for (int i = 0; i < size; i++) {
+            b.put(fromIndex + i, (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
index 1cc15cf..b45c5f9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -15,57 +15,56 @@
 
 public interface IMemoryManager {
 
-	/**
-	 * Allocates a free slot equal or greater than requested length. Pointer to
-	 * the allocated slot is put in result, and gets returned to the caller. If
-	 * no proper free slot is available, result would contain the null/invalid
-	 * pointer (may vary between different implementations)
-	 * 
-	 * @param length
-	 * @param result
-	 * @throws HyracksDataException
-	 */
-	void allocate(int length, Slot result) throws HyracksDataException;
+    /**
+     * Allocates a free slot equal or greater than requested length. Pointer to
+     * the allocated slot is put in result, and gets returned to the caller. If
+     * no proper free slot is available, result would contain a null/invalid
+     * pointer (may vary between different implementations)
+     * 
+     * @param length
+     * @param result
+     * @throws HyracksDataException
+     */
+    void allocate(int length, Slot result) throws HyracksDataException;
 
-	/**
-	 * Unallocates the specified slot (and returns it back to the free slots
-	 * set)
-	 * 
-	 * @param s
-	 * @return the total length of unallocted slot
-	 * @throws HyracksDataException
-	 */
-	int unallocate(Slot s) throws HyracksDataException;
+    /**
+     * Unallocates the specified slot (and returns it back to the free slots
+     * set)
+     * 
+     * @param s
+     * @return the total length of unallocted slot
+     * @throws HyracksDataException
+     */
+    int unallocate(Slot s) throws HyracksDataException;
 
-	/**
-	 * @param frameIndex
-	 * @return the specified frame, from the set of memory buffers, being
-	 *         managed by this memory manager
-	 */
-	ByteBuffer getFrame(int frameIndex);
+    /**
+     * @param frameIndex
+     * @return the specified frame, from the set of memory buffers, being
+     *         managed by this memory manager
+     */
+    ByteBuffer getFrame(int frameIndex);
 
-	/**
-	 * Writes the specified tuple into the specified memory slot (denoted by
-	 * frameIX and offset)
-	 * 
-	 * @param frameIx
-	 * @param offset
-	 * @param src
-	 * @param tIndex
-	 * @return
-	 */
-	boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src,
-			int tIndex);
+    /**
+     * Writes the specified tuple into the specified memory slot (denoted by
+     * frameIx and offset)
+     * 
+     * @param frameIx
+     * @param offset
+     * @param src
+     * @param tIndex
+     * @return
+     */
+    boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex);
 
-	/**
-	 * Reads the specified tuple (denoted by frameIx and offset) and appends it
-	 * to the passed FrameTupleAppender
-	 * 
-	 * @param frameIx
-	 * @param offset
-	 * @param dest
-	 * @return
-	 */
-	boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
+    /**
+     * Reads the specified tuple (denoted by frameIx and offset) and appends it
+     * to the passed FrameTupleAppender
+     * 
+     * @param frameIx
+     * @param offset
+     * @param dest
+     * @return
+     */
+    boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
 
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 1d55bc8..7568e26 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -8,12 +8,12 @@
 /**
  * @author pouria
  * 
- *         Inteface for the Run Generator
+ *         Interface for the Run Generator
  */
 public interface IRunGenerator extends IFrameWriter {
-	/**
-	 * 
-	 * @return the list of generated runs, each stored sorted
-	 */
-	public List<IFrameReader> getRuns();
+
+    /**
+     * @return the list of generated (sorted) runs
+     */
+    public List<IFrameReader> getRuns();
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
index 2cc731c..73407f3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -1,53 +1,71 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 /**
- * @author pouria Defines the selection tree, used in sorting with replacement
- *         selection to manage the order of outputing tuples into the runs,
- *         during the run generation phase. At each point of time, this tree
- *         contains tuples, belonging to two different runs: - Current run,
- *         being written to the output - Next run
+ * @author pouria
+ * 
+ *         Defines the selection tree, used in sorting with replacement
+ *         selection to manage the order of output tuples into the runs, during
+ *         the run generation phase. This tree contains tuples, belonging to two
+ *         different runs: - Current run (being written to the output) - Next
+ *         run
  */
+
 public interface ISelectionTree {
-	/**
-	 * Inserts a new element into the selection.
-	 * 
-	 * @param element
-	 *            contains the pointer to the memory slot, containing the tuple,
-	 *            along with its run number
-	 */
-	void insert(int[] element);
 
-	/**
-	 * @return Removes and returns the smallest elemnt in the tree
-	 */
-	int[] getMin();
+    /**
+     * Inserts a new element into the selectionTree
+     * 
+     * @param element
+     *            contains the pointer to the memory slot, containing the tuple,
+     *            along with its run number
+     */
+    void insert(int[] element);
 
-	/**
-	 * @return Removes and returns the largest elemnt in the tree
-	 */
-	int[] getMax();
+    /**
+     * Removes and returns the smallest element in the tree
+     * 
+     * @param result
+     *            is the array that will eventually contain minimum entry
+     *            pointer
+     */
+    void getMin(int[] result);
 
-	/**
-	 * @return True of the selection tree does not have any element, and false
-	 *         otherwise.
-	 */
-	boolean isEmpty();
+    /**
+     * Removes and returns the largest element in the tree
+     * 
+     * @param result
+     *            is the array that will eventually contain maximum entry
+     *            pointer
+     */
+    void getMax(int[] result);
 
-	/**
-	 * Removes all the elements in the tree
-	 */
-	void reset();
+    /**
+     * @return True of the selection tree does not have any element, false
+     *         otherwise
+     */
+    boolean isEmpty();
 
-	/**
-	 * 
-	 * @return Returns, but does NOT remove, the smallest element in the tree
-	 */
-	int[] peekMin();
+    /**
+     * Removes all the elements in the tree
+     */
+    void reset();
 
-	/**
-	 * 
-	 * @return Returns, but does NOT remove, the largest element in the tree
-	 */
-	int[] peekMax();
+    /**
+     * Returns (and does NOT remove) the smallest element in the tree
+     * 
+     * @param result
+     *            is the array that will eventually contain minimum entry
+     *            pointer
+     */
+    void peekMin(int[] result);
+
+    /**
+     * Returns (and does NOT remove) the largest element in the tree
+     * 
+     * @param result
+     *            is the array that will eventually contain maximum entry
+     *            pointer
+     */
+    void peekMax(int[] result);
 
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index e617ccb..2f1c94c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -30,7 +30,7 @@
 /**
  * @author pouria
  * 
- *         Operator descriptor for sorting with replacement. Consists of two
+ *         Operator descriptor for sorting with replacement, consisting of two
  *         phases:
  * 
  *         - Run Generation: Denoted by OptimizedSortActivity below, in which
@@ -42,188 +42,163 @@
  *         - Merging: Denoted by MergeActivity below, in which runs (generated
  *         in the previous phase) get merged via a merger. Each run has a single
  *         buffer in memory, and a priority queue is used to select the top
- *         tuple each time and send it to the new run/output
+ *         tuple each time. Top tuple is send to a new run or output
  */
-public class OptimizedExternalSortOperatorDescriptor extends
-		AbstractOperatorDescriptor {
 
-	private static final int NO_LIMIT = -1;
+public class OptimizedExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
 
-	private static final long serialVersionUID = 1L;
+    private static final int NO_LIMIT = -1;
+    private static final long serialVersionUID = 1L;
+    private static final int SORT_ACTIVITY_ID = 0;
+    private static final int MERGE_ACTIVITY_ID = 1;
 
-	private static final int SORT_ACTIVITY_ID = 0;
-	private static final int MERGE_ACTIVITY_ID = 1;
+    private final int[] sortFields;
+    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final int memSize;
+    private final int outputLimit;
 
-	private final int[] sortFields;
-	private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-	private final IBinaryComparatorFactory[] comparatorFactories;
-	private final int memSize;
-	private final int outputLimit;
+    public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int[] sortFields,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        this(spec, framesLimit, NO_LIMIT, sortFields, null, comparatorFactories, recordDescriptor);
+    }
 
-	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
-			int framesLimit, int[] sortFields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDescriptor) {
-		this(spec, framesLimit, NO_LIMIT, sortFields, null,
-				comparatorFactories, recordDescriptor);
-	}
+    public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int framesLimit, int outputLimit,
+            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        this(spec, framesLimit, outputLimit, sortFields, null, comparatorFactories, recordDescriptor);
+    }
 
-	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
-			int framesLimit, int outputLimit, int[] sortFields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDescriptor) {
-		this(spec, framesLimit, outputLimit, sortFields, null,
-				comparatorFactories, recordDescriptor);
-	}
+    public OptimizedExternalSortOperatorDescriptor(JobSpecification spec, int memSize, int outputLimit,
+            int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        this.memSize = memSize;
+        this.outputLimit = outputLimit;
+        this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+        if (memSize <= 1) {
+            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+        }
+        recordDescriptors[0] = recordDescriptor;
+    }
 
-	public OptimizedExternalSortOperatorDescriptor(JobSpecification spec,
-			int memSize, int outputLimit, int[] sortFields,
-			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDescriptor) {
-		super(spec, 1, 1);
-		this.memSize = memSize;
-		this.outputLimit = outputLimit;
-		this.sortFields = sortFields;
-		this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
-		this.comparatorFactories = comparatorFactories;
-		if (memSize <= 1) {
-			throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
-		}
-		recordDescriptors[0] = recordDescriptor;
-	}
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+        OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
-	@Override
-	public void contributeActivities(IActivityGraphBuilder builder) {
-		OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(
-				odId, SORT_ACTIVITY_ID));
-		OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(
-				odId, MERGE_ACTIVITY_ID));
+        builder.addActivity(osa);
+        builder.addSourceEdge(0, osa, 0);
 
-		builder.addActivity(osa);
-		builder.addSourceEdge(0, osa, 0);
+        builder.addActivity(oma);
+        builder.addTargetEdge(0, oma, 0);
 
-		builder.addActivity(oma);
-		builder.addTargetEdge(0, oma, 0);
+        builder.addBlockingEdge(osa, oma);
+    }
 
-		builder.addBlockingEdge(osa, oma);
-	}
+    public static class OptimizedSortTaskState extends AbstractTaskState {
+        private List<IFrameReader> runs;
 
-	public static class OptimizedSortTaskState extends AbstractTaskState {
-		private List<IFrameReader> runs;
+        public OptimizedSortTaskState() {
+        }
 
-		public OptimizedSortTaskState() {
-		}
+        private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
 
-		private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
-			super(jobId, taskId);
-		}
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
 
-		@Override
-		public void toBytes(DataOutput out) throws IOException {
+        }
 
-		}
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
 
-		@Override
-		public void fromBytes(DataInput in) throws IOException {
+        }
+    }
 
-		}
-	}
+    private class OptimizedSortActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
 
-	private class OptimizedSortActivity extends AbstractActivityNode {
-		private static final long serialVersionUID = 1L;
+        public OptimizedSortActivity(ActivityId id) {
+            super(id);
+        }
 
-		public OptimizedSortActivity(ActivityId id) {
-			super(id);
-		}
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            final IRunGenerator runGen;
+            if (outputLimit == NO_LIMIT) {
+                runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+                        comparatorFactories, recordDescriptors[0], memSize);
+            } else {
+                runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx, sortFields, firstKeyNormalizerFactory,
+                        comparatorFactories, recordDescriptors[0], memSize, outputLimit);
+            }
 
-		@Override
-		public IOperatorNodePushable createPushRuntime(
-				final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-				IRecordDescriptorProvider recordDescProvider,
-				final int partition, int nPartitions) {
-			final IRunGenerator runGen;
-			if (outputLimit == NO_LIMIT) {
-				runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields,
-						firstKeyNormalizerFactory, comparatorFactories,
-						recordDescriptors[0], memSize);
-			} else {
-				runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx,
-						sortFields, firstKeyNormalizerFactory,
-						comparatorFactories, recordDescriptors[0], memSize,
-						outputLimit);
-			}
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                @Override
+                public void open() throws HyracksDataException {
 
-			IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-				@Override
-				public void open() throws HyracksDataException {
+                    runGen.open();
+                }
 
-					runGen.open();
-				}
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    runGen.nextFrame(buffer);
+                }
 
-				@Override
-				public void nextFrame(ByteBuffer buffer)
-						throws HyracksDataException {
-					runGen.nextFrame(buffer);
-				}
+                @Override
+                public void close() throws HyracksDataException {
+                    OptimizedSortTaskState state = new OptimizedSortTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
+                    runGen.close();
+                    state.runs = runGen.getRuns();
+                    env.setTaskState(state);
 
-				@Override
-				public void close() throws HyracksDataException {
-					OptimizedSortTaskState state = new OptimizedSortTaskState(
-							ctx.getJobletContext().getJobId(), new TaskId(
-									getActivityId(), partition));
-					runGen.close();
-					state.runs = runGen.getRuns();
-					env.setTaskState(state);
+                }
 
-				}
+                @Override
+                public void fail() throws HyracksDataException {
+                    runGen.fail();
+                }
+            };
+            return op;
+        }
+    }
 
-				@Override
-				public void fail() throws HyracksDataException {
-					runGen.fail();
-				}
-			};
-			return op;
-		}
-	}
+    private class OptimizedMergeActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
 
-	private class OptimizedMergeActivity extends AbstractActivityNode {
-		private static final long serialVersionUID = 1L;
+        public OptimizedMergeActivity(ActivityId id) {
+            super(id);
+        }
 
-		public OptimizedMergeActivity(ActivityId id) {
-			super(id);
-		}
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+                @Override
+                public void initialize() throws HyracksDataException {
+                    OptimizedSortTaskState state = (OptimizedSortTaskState) env.getTaskState(new TaskId(new ActivityId(
+                            getOperatorId(), SORT_ACTIVITY_ID), partition));
 
-		@Override
-		public IOperatorNodePushable createPushRuntime(
-				final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-				IRecordDescriptorProvider recordDescProvider,
-				final int partition, int nPartitions) {
-			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-				@Override
-				public void initialize() throws HyracksDataException {
-					OptimizedSortTaskState state = (OptimizedSortTaskState) env
-							.getTaskState(new TaskId(new ActivityId(
-									getOperatorId(), SORT_ACTIVITY_ID),
-									partition));
+                    List<IFrameReader> runs = state.runs;
 
-					List<IFrameReader> runs = state.runs;
+                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                    for (int i = 0; i < comparatorFactories.length; ++i) {
+                        comparators[i] = comparatorFactories[i].createBinaryComparator();
+                    }
 
-					IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-					for (int i = 0; i < comparatorFactories.length; ++i) {
-						comparators[i] = comparatorFactories[i]
-								.createBinaryComparator();
-					}
+                    OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(ctx, outputLimit, runs,
+                            sortFields, comparators, recordDescriptors[0], memSize, writer);
 
-					OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(
-							ctx, outputLimit, runs, sortFields, comparators,
-							recordDescriptors[0], memSize, writer);
+                    merger.process();
 
-					merger.process();
-
-				}
-			};
-			return op;
-		}
-	}
+                }
+            };
+            return op;
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
index 13ba5c7..1430e6f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -30,284 +30,277 @@
  *         memory.
  * 
  *         The overall process is as follows: - Read the input data frame by
- *         frame. For each tuple T in the current frame: - Try to allocate a
- *         memory slot for writing T along with the attached header/footer (for
- *         memory management purpose) - If T can not be allocated, try to output
- *         as many as tuples, currently resident in memory, so that a free slot,
- *         large enough to hold T, gets created. MinHeap decides about which
- *         tuple should be outputed at each step. Write T into the memory. -
- *         Calculate the runID of T (based on the last output tuple for the
+ *         frame. For each tuple T in the current frame:
+ * 
+ *         - Try to allocate a memory slot for writing T along with the attached
+ *         header/footer (for memory management purpose)
+ * 
+ *         - If T can not be allocated, try to output as many tuples, currently
+ *         resident in memory, as needed so that a free slot, large enough to
+ *         hold T, gets created. MinHeap decides about which tuple should be
+ *         sent to the output at each step.
+ * 
+ *         - Write T into the memory
+ * 
+ *         - Calculate the runID of T (based on the last output tuple for the
  *         current run). It is either the current run or the next run. Also
  *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
- *         faster later. - Create an heap element for T, containing its runID,
- *         the slot ptr to its memory location, and its PNK. - Insert the heap
- *         element into the heap. - Upon closing, write all the tuples,
- *         currently resident in memory, into their corresponding run(s). Again
- *         min heap decides about which tuple is the next for output.
+ *         faster later.
+ * 
+ *         - Create a heap element for T, containing: its runID, the slot
+ *         pointer to its memory location, and its PNK.
+ * 
+ *         - Insert the created heap element into the heap
+ * 
+ *         - Upon closing, write all the tuples, currently resident in memory,
+ *         into their corresponding run(s). Again min heap decides about which
+ *         tuple is the next for output.
  * 
  *         OptimizedSortOperatorDescriptor will merge the generated runs, to
- *         generate the final sorted output opf the data.
+ *         generate the final sorted output of the data.
  */
 public class OptimizedExternalSortRunGenerator implements IRunGenerator {
-	private final IHyracksTaskContext ctx;
-	private final int[] sortFields;
-	private final INormalizedKeyComputer nkc;
-	IBinaryComparatorFactory[] comparatorFactories;
-	private final IBinaryComparator[] comparators;
-	private final RecordDescriptor recordDescriptor;
-	private final List<IFrameReader> runs;
+    private final IHyracksTaskContext ctx;
+    private final int[] sortFields;
+    private final INormalizedKeyComputer nkc;
+    IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final List<IFrameReader> runs;
 
-	private ISelectionTree sTree;
-	private IMemoryManager memMgr;
+    private ISelectionTree sTree;
+    private IMemoryManager memMgr;
 
-	private final int memSize;
-	private FrameTupleAccessor inputAccessor; // Used to read tuples in
-												// nextFrame()
-	private FrameTupleAppender outputAppender; // Used to write tuple to the
-												// dedicated output buffer
-	private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
-										// into run(s)
-	private FrameTupleAccessor lastRecordAccessor; // Used to read last output
-													// record from the output
-													// buffer
-	private int curRunSize;
-	private int lastTupleIx; // Holds index of last output tuple in the
-								// dedicated output buffer
-	private Slot allocationPtr; // Contains the ptr to the allocated memory slot
-								// by the memory manager for the new tuple
-	private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
-								// the selectionTree to output
-	private int[] sTreeTop;
+    private final int memSize;
+    private FrameTupleAccessor inputAccessor; // Used to read tuples in
+                                              // nextFrame()
+    private FrameTupleAppender outputAppender; // Used to write tuple to the
+                                               // dedicated output buffer
+    private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+                                     // into run(s)
+    private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+                                                   // record from the output
+                                                   // buffer
+    private int curRunSize;
+    private int lastTupleIx; // Holds index of last output tuple in the
+                             // dedicated output buffer
+    private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+                                // by the memory manager for the new tuple
+    private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+                                // the selectionTree to output
+    private int[] sTreeTop;
 
-	RunFileWriter writer;
+    RunFileWriter writer;
 
-	private boolean newRun;
-	private int curRunId;
+    private boolean newRun;
+    private int curRunId;
 
-	public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx,
-			int[] sortFields,
-			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDesc, int memSize) {
-		this.ctx = ctx;
-		this.sortFields = sortFields;
-		nkc = firstKeyNormalizerFactory == null ? null
-				: firstKeyNormalizerFactory.createNormalizedKeyComputer();
-		this.comparatorFactories = comparatorFactories;
-		comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
-		this.recordDescriptor = recordDesc;
-		this.runs = new LinkedList<IFrameReader>();
-		this.memSize = memSize;
-	}
+    public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, int memSize) {
+        this.ctx = ctx;
+        this.sortFields = sortFields;
+        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        this.comparatorFactories = comparatorFactories;
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDescriptor = recordDesc;
+        this.runs = new LinkedList<IFrameReader>();
+        this.memSize = memSize;
+    }
 
-	@Override
-	public void open() throws HyracksDataException {
-		runs.clear();
-		inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				recordDescriptor);
-		outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-		outputBuffer = ctx.allocateFrame();
-		outputAppender.reset(outputBuffer, true);
-		lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				recordDescriptor);
+    @Override
+    public void open() throws HyracksDataException {
+        runs.clear();
+        inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputBuffer = ctx.allocateFrame();
+        outputAppender.reset(outputBuffer, true);
+        lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
 
-		this.memMgr = new BSTMemMgr(ctx, memSize);
-		this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories,
-				recordDescriptor, memMgr);
-		this.allocationPtr = new Slot();
-		this.outputedTuple = new Slot();
-		this.sTreeTop = new int[] { -1, -1, -1, -1 };
-		curRunId = -1;
-		openNewRun();
-	}
+        this.memMgr = new BSTMemMgr(ctx, memSize);
+        this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
+        this.allocationPtr = new Slot();
+        this.outputedTuple = new Slot();
+        this.sTreeTop = new int[] { -1, -1, -1, -1 };
+        curRunId = -1;
+        openNewRun();
+    }
 
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-		inputAccessor.reset(buffer);
-		byte[] bufferArray = buffer.array();
-		int tupleCount = inputAccessor.getTupleCount();
-		for (int i = 0; i < tupleCount; ++i) {
-			allocationPtr.clear();
-			int tLength = inputAccessor.getTupleEndOffset(i)
-					- inputAccessor.getTupleStartOffset(i);
-			memMgr.allocate(tLength, allocationPtr);
-			while (allocationPtr.isNull()) {
-				int unAllocSize = -1;
-				while (unAllocSize < tLength) {
-					unAllocSize = outputRecord();
-					if (unAllocSize < 1) {
-						throw new HyracksDataException(
-								"Unable to allocate space for the new tuple, while there is no more tuple to output");
-					}
-				}
-				memMgr.allocate(tLength, allocationPtr);
-				((BSTMemMgr) memMgr)._debug_decLookupCount();
-			}
-			memMgr.writeTuple(allocationPtr.getFrameIx(),
-					allocationPtr.getOffset(), inputAccessor, i);
-			int runId = getRunId(inputAccessor, i);
-			int pnk = getPNK(inputAccessor, i, bufferArray);
-			int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
-					allocationPtr.getOffset(), pnk };
-			sTree.insert(entry);
-		}
-	}
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputAccessor.reset(buffer);
+        byte[] bufferArray = buffer.array();
+        int tupleCount = inputAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            allocationPtr.clear();
+            int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
+            memMgr.allocate(tLength, allocationPtr);
+            while (allocationPtr.isNull()) {
+                int unAllocSize = -1;
+                while (unAllocSize < tLength) {
+                    unAllocSize = outputRecord();
+                    if (unAllocSize < 1) {
+                        throw new HyracksDataException(
+                                "Unable to allocate space for the new tuple, while there is no more tuple to output");
+                    }
+                }
+                memMgr.allocate(tLength, allocationPtr);
+                ((BSTMemMgr) memMgr).debugDecLookupCount();
+            }
+            memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+            int runId = getRunId(inputAccessor, i);
+            int pnk = getPNK(inputAccessor, i, bufferArray);
+            int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+            sTree.insert(entry);
+        }
+    }
 
-	@Override
-	public void fail() throws HyracksDataException {
-	}
+    @Override
+    public void fail() throws HyracksDataException {
+    }
 
-	@Override
-	public void close() throws HyracksDataException {
-		while (!sTree.isEmpty()) { // Outputting remaining elements in the
-									// selectionTree
-			outputRecord();
-		}
-		if (outputAppender.getTupleCount() > 0) { // Writing out very last
-													// resident records to file
-			FrameUtils.flushFrame(outputBuffer, writer);
-		}
-		outputAppender.reset(outputBuffer, true);
-		writer.close();
-		runs.add(writer.createReader());
-	}
+    @Override
+    public void close() throws HyracksDataException {
+        while (!sTree.isEmpty()) { // Outputting remaining elements in the
+                                   // selectionTree
+            outputRecord();
+        }
+        if (outputAppender.getTupleCount() > 0) { // Writing out very last
+                                                  // resident records to file
+            FrameUtils.flushFrame(outputBuffer, writer);
+        }
+        outputAppender.reset(outputBuffer, true);
+        writer.close();
+        runs.add(writer.createReader());
+    }
 
-	public List<IFrameReader> getRuns() {
-		return runs;
-	}
+    public List<IFrameReader> getRuns() {
+        return runs;
+    }
 
-	private int outputRecord() throws HyracksDataException {
-		outputedTuple.clear();
-		sTreeTop = sTree.getMin();
-		if (!isEntryValid(sTreeTop)) {
-			throw new HyracksDataException(
-					"Invalid outputed tuple (Top of the selection tree is invalid)");
-		}
+    private int outputRecord() throws HyracksDataException {
+        outputedTuple.clear();
+        sTree.getMin(sTreeTop);
+        if (!isEntryValid(sTreeTop)) {
+            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+        }
 
-		if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
-															// runs
-			openNewRun();
-		}
+        if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
+                                                           // runs
+            openNewRun();
+        }
 
-		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-																	// append to
-																	// the
-																	// tupleAppender
-			FrameUtils.flushFrame(outputBuffer, writer);
-			outputAppender.reset(outputBuffer, true);
-			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-				throw new HyracksDataException(
-						"Can not append to the ouput buffer in sort");
-			}
-			lastTupleIx = 0;
-		} else {
-			lastTupleIx++;
-		}
-		outputedTuple.set(tFrameIx, tOffset);
-		newRun = false;
-		curRunSize++;
-		return memMgr.unallocate(outputedTuple);
+        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+                                                                    // append to
+                                                                    // the
+                                                                    // tupleAppender
+            FrameUtils.flushFrame(outputBuffer, writer);
+            outputAppender.reset(outputBuffer, true);
+            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+                throw new HyracksDataException("Can not append to the ouput buffer in sort");
+            }
+            lastTupleIx = 0;
+        } else {
+            lastTupleIx++;
+        }
+        outputedTuple.set(tFrameIx, tOffset);
+        newRun = false;
+        curRunSize++;
+        return memMgr.unallocate(outputedTuple);
 
-	}
+    }
 
-	private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
-																				// buffInArray
-																				// out
-																				// for
-																				// better
-																				// performance
-																				// (not
-																				// converting
-																				// for
-																				// each
-																				// and
-																				// every
-																				// tuple)
-		int sfIdx = sortFields[0];
-		int tStart = fta.getTupleStartOffset(tIx);
-		int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
-		int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
-		int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
-		return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
-				- f0StartRel));
-	}
+    private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+                                                                              // buffInArray
+                                                                              // out
+                                                                              // for
+                                                                              // better
+                                                                              // performance
+                                                                              // (not
+                                                                              // converting
+                                                                              // for
+                                                                              // each
+                                                                              // and
+                                                                              // every
+                                                                              // tuple)
+        int sfIdx = sortFields[0];
+        int tStart = fta.getTupleStartOffset(tIx);
+        int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+        int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+        int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+        return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
+    }
 
-	private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
-																// current
-																// record to
-																// last output
-																// record, it
-																// decided about
-																// current
-																// record's
-																// runId
-		if (newRun) { // Very first record for a new run
-			return curRunId;
-		}
+    private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+                                                              // current
+                                                              // record to
+                                                              // last output
+                                                              // record, it
+                                                              // decides about
+                                                              // current
+                                                              // record's
+                                                              // runId
+        if (newRun) { // Very first record for a new run
+            return curRunId;
+        }
 
-		byte[] lastRecBuff = outputBuffer.array();
-		lastRecordAccessor.reset(outputBuffer);
-		int lastStartOffset = lastRecordAccessor
-				.getTupleStartOffset(lastTupleIx);
+        byte[] lastRecBuff = outputBuffer.array();
+        lastRecordAccessor.reset(outputBuffer);
+        int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
 
-		ByteBuffer fr2 = fta.getBuffer();
-		byte[] curRecBuff = fr2.array();
-		int r2StartOffset = fta.getTupleStartOffset(tupIx);
+        ByteBuffer fr2 = fta.getBuffer();
+        byte[] curRecBuff = fr2.array();
+        int r2StartOffset = fta.getTupleStartOffset(tupIx);
 
-		for (int f = 0; f < comparators.length; ++f) {
-			int fIdx = sortFields[f];
-			int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
-					+ (fIdx - 1) * 4);
-			int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
-			int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
-					+ f1Start;
-			int l1 = f1End - f1Start;
-			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
-					* 4);
-			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-			int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
-			int l2 = f2End - f2Start;
-			int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
-					l2);
-			if (c != 0) {
-				if (c <= 0) {
-					return curRunId;
-				} else {
-					return (curRunId + 1);
-				}
-			}
-		}
-		return curRunId;
-	}
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
+            int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+            int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+            int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
+            int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
+            if (c != 0) {
+                if (c <= 0) {
+                    return curRunId;
+                } else {
+                    return (curRunId + 1);
+                }
+            }
+        }
+        return curRunId;
+    }
 
-	private void openNewRun() throws HyracksDataException {
-		if (writer != null) { // There is a prev run, so flush its tuples and
-								// close it first
-			if (outputAppender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(outputBuffer, writer);
-			}
-			outputAppender.reset(outputBuffer, true);
-			writer.close();
-			runs.add(writer.createReader());
-		}
+    private void openNewRun() throws HyracksDataException {
+        if (writer != null) { // There is a prev run, so flush its tuples and
+                              // close it first
+            if (outputAppender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+            outputAppender.reset(outputBuffer, true);
+            writer.close();
+            runs.add(writer.createReader());
+        }
 
-		FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-				ExternalSortRunGenerator.class.getSimpleName());
-		writer = new RunFileWriter(file, ctx.getIOManager());
-		writer.open();
-		curRunId++;
-		newRun = true;
-		curRunSize = 0;
-		lastTupleIx = -1;
-	}
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                ExternalSortRunGenerator.class.getSimpleName());
+        writer = new RunFileWriter(file, ctx.getIOManager());
+        writer.open();
+        curRunId++;
+        newRun = true;
+        curRunSize = 0;
+        lastTupleIx = -1;
+    }
 
-	private boolean isEntryValid(int[] entry) {
-		return ((entry[SortMinHeap.RUN_ID_IX] > -1)
-				&& (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
-	}
+    private boolean isEntryValid(int[] entry) {
+        return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
index df40577..f69f676 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -24,37 +24,51 @@
  * 
  *         This class implements the run generator for sorting with replacement
  *         selection, where there is a limit on the output, i.e. we are looking
- *         for top-k tuples (first k smallest tuples w.r.t sorting keys), if
- *         data were sorted, where k is a specific limit, decided in advance.
+ *         for top-k tuples (first k smallest tuples w.r.t sorting keys).
  * 
  *         A SortMinMaxHeap is used as the selectionTree to decide the order of
  *         writing tuples into the runs, and also to prune tuples (if possible).
- *         Memory manager is based on a binary search tree to allocate tuples.
+ *         Memory manager is based on a binary search tree and is used to
+ *         allocate memory slots for tuples.
  * 
- *         The overall process is as follows (Assuming that the limit is K): -
- *         Read the input data frame by frame. For each tuple T in the current
- *         frame: - If currentRun R has reached the limit of K on the size, and
- *         (T > maximum tuple of R), then ignore T. - Otherwise, try to allocate
- *         a memory slot for writing T along with the attached header/footer
- *         (for memory management purpose) - If T can not be allocated, try to
- *         output as many as tuples, currently resident in memory, so that a
- *         free slot, large enough to hold T, gets created. MinMaxHeap decides
- *         about which tuple should be outputed at each step. Write T into the
- *         memory. - Calculate the runID of T (based on the last output tuple
- *         for the current run). It is either the current run or the next run.
- *         Also calculate Poorman's Normalized Key (PNK) for T, to make
- *         comparisons faster later. - Create an heap element for T, containing
- *         its runID, the slot ptr to its memory location, and its PNK. - If
- *         runID is the nextRun, insert the heap element into the heap, and
- *         increment the size of nextRun in the stats. - If runID is the
- *         currentRun: - If currentRun has not hit the limit of k, insert the
- *         element into the heap, and increase currentRun size in the stats. -
- *         (arriving here, currentRun has hit the limit of K, while T is less
- *         than the max). Discard the current max for the current run (by
- *         pop-ing it from the heap and unallocating its memory location) and
- *         insert the heap element into the heap. No need to change the
- *         currentRun size as we are replacing an old element (the old max) with
- *         T.
+ *         The overall process is as follows (Assuming that the limit is K):
+ * 
+ *         - Read the input data frame by frame. For each tuple T in the current
+ *         frame:
+ * 
+ *         - If currentRun R has reached the limit of K on the size, and (T >
+ *         maximum tuple of R), then ignore T.
+ * 
+ *         - Otherwise, try to allocate a memory slot for writing T along with
+ *         the attached header/footer (for memory management purpose)
+ * 
+ *         - If T can not be allocated, try to output as many tuples, currently
+ *         resident in memory, as needed so that a free slot, large enough to
+ *         hold T, gets created. MinMaxHeap decides about which tuple should be
+ *         sent to the output at each step.
+ * 
+ *         - Write T into memory.
+ * 
+ *         - Calculate the runID of T (based on the last output tuple for the
+ *         current run). It is either the current run or the next run. Also
+ *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
+ *         faster later.
+ * 
+ *         - Create an heap element for T, containing its runID, the slot ptr to
+ *         its memory location, and its PNK.
+ * 
+ *         - If runID is the nextRun, insert the heap element into the heap, and
+ *         increment the size of nextRun.
+ * 
+ *         - If runID is the currentRun, then:
+ * 
+ *         - If currentRun has not hit the limit of k, insert the element into
+ *         the heap, and increase currentRun size. - Otherwise, currentRun has
+ *         hit the limit of K, while T is less than the max. So discard the
+ *         current max for the current run (by poping it from the heap and
+ *         unallocating its memory location) and insert the heap element into
+ *         the heap. No need to change the currentRun size as we are replacing
+ *         an old element (the old max) with T.
  * 
  *         - Upon closing, write all the tuples, currently resident in memory,
  *         into their corresponding run(s).
@@ -64,412 +78,378 @@
  *         resident in memory) get discarded at the beginning. MinMax heap can
  *         be used to find these tuples.
  */
-public class OptimizedExternalSortRunGeneratorWithLimit implements
-		IRunGenerator {
+public class OptimizedExternalSortRunGeneratorWithLimit implements IRunGenerator {
 
-	private final IHyracksTaskContext ctx;
-	private final int[] sortFields;
-	private final INormalizedKeyComputer nkc;
-	IBinaryComparatorFactory[] comparatorFactories;
-	private final IBinaryComparator[] comparators;
-	private final RecordDescriptor recordDescriptor;
-	private final List<IFrameReader> runs;
+    private final IHyracksTaskContext ctx;
+    private final int[] sortFields;
+    private final INormalizedKeyComputer nkc;
+    IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final List<IFrameReader> runs;
 
-	private ISelectionTree sTree;
-	private IMemoryManager memMgr;
+    private ISelectionTree sTree;
+    private IMemoryManager memMgr;
 
-	private final int memSize;
-	private FrameTupleAccessor inputAccessor; // Used to read tuples in
-												// nextFrame()
-	private FrameTupleAppender outputAppender; // Used to write tuple to the
-												// dedicated output buffer
-	private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
-										// into run(s)
-	private FrameTupleAccessor lastRecordAccessor; // Used to read last output
-													// record from the output
-													// buffer
-	private FrameTupleAccessor fta2; // Used to read max record
-	private final int outputLimit;
-	private int curRunSize;
-	private int nextRunSize;
-	private int lastTupleIx; // Holds index of last output tuple in the
-								// dedicated output buffer
-	private Slot allocationPtr; // Contains the ptr to the allocated memory slot
-								// by the memory manager for the new tuple
-	private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
-								// the selectionTree to output
-	private Slot discard;
-	private int[] sTreeTop;
-	private int[] peek;
-	RunFileWriter writer;
-	private boolean newRun;
-	private int curRunId;
+    private final int memSize;
+    private FrameTupleAccessor inputAccessor; // Used to read tuples in
+                                              // nextFrame()
+    private FrameTupleAppender outputAppender; // Used to write tuple to the
+                                               // dedicated output buffer
+    private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
+                                     // into run(s)
+    private FrameTupleAccessor lastRecordAccessor; // Used to read last output
+                                                   // record from the output
+                                                   // buffer
+    private FrameTupleAccessor fta2; // Used to read max record
+    private final int outputLimit;
+    private int curRunSize;
+    private int nextRunSize;
+    private int lastTupleIx; // Holds index of last output tuple in the
+                             // dedicated output buffer
+    private Slot allocationPtr; // Contains the ptr to the allocated memory slot
+                                // by the memory manager for the new tuple
+    private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
+                                // the selectionTree to output
+    private Slot discard;
+    private int[] sTreeTop;
+    private int[] peek;
+    RunFileWriter writer;
+    private boolean newRun;
+    private int curRunId;
 
-	public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx,
-			int[] sortFields,
-			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDesc, int memSize, int limit) {
+    public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, int memSize, int limit) {
 
-		this.ctx = ctx;
-		this.sortFields = sortFields;
-		nkc = firstKeyNormalizerFactory == null ? null
-				: firstKeyNormalizerFactory.createNormalizedKeyComputer();
-		this.comparatorFactories = comparatorFactories;
-		comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
-		this.recordDescriptor = recordDesc;
-		this.runs = new LinkedList<IFrameReader>();
-		this.memSize = memSize;
+        this.ctx = ctx;
+        this.sortFields = sortFields;
+        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        this.comparatorFactories = comparatorFactories;
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDescriptor = recordDesc;
+        this.runs = new LinkedList<IFrameReader>();
+        this.memSize = memSize;
 
-		this.outputLimit = limit;
-	}
+        this.outputLimit = limit;
+    }
 
-	@Override
-	public void open() throws HyracksDataException {
-		runs.clear();
-		inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				recordDescriptor);
-		outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-		outputBuffer = ctx.allocateFrame();
-		outputAppender.reset(outputBuffer, true);
-		lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				recordDescriptor);
-		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-		this.memMgr = new BSTMemMgr(ctx, memSize);
-		this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories,
-				recordDescriptor, memMgr);
-		this.allocationPtr = new Slot();
-		this.outputedTuple = new Slot();
-		this.sTreeTop = new int[] { -1, -1, -1, -1 };
-		this.peek = new int[] { -1, -1, -1, -1 };
-		this.discard = new Slot();
+    @Override
+    public void open() throws HyracksDataException {
+        runs.clear();
+        inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputBuffer = ctx.allocateFrame();
+        outputAppender.reset(outputBuffer, true);
+        lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        this.memMgr = new BSTMemMgr(ctx, memSize);
+        this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
+        this.allocationPtr = new Slot();
+        this.outputedTuple = new Slot();
+        this.sTreeTop = new int[] { -1, -1, -1, -1 };
+        this.peek = new int[] { -1, -1, -1, -1 };
+        this.discard = new Slot();
 
-		curRunId = -1;
-		curRunSize = 0;
-		nextRunSize = 0;
-		openNewRun();
-	}
+        curRunId = -1;
+        curRunSize = 0;
+        nextRunSize = 0;
+        openNewRun();
+    }
 
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-		inputAccessor.reset(buffer);
-		byte[] bufferArray = buffer.array();
-		int tupleCount = inputAccessor.getTupleCount();
-		for (int i = 0; i < tupleCount; i++) {
-			if (curRunSize >= outputLimit) {
-				peek = sTree.peekMax();
-				if (isEntryValid(peek)
-						&& compareRecords(inputAccessor, i,
-								peek[SortMinMaxHeap.FRAME_IX],
-								peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
-					continue;
-				}
-			}
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputAccessor.reset(buffer);
+        byte[] bufferArray = buffer.array();
+        int tupleCount = inputAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            if (curRunSize >= outputLimit) {
+                sTree.peekMax(peek);
+                if (isEntryValid(peek)
+                        && compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
+                                peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
+                    continue;
+                }
+            }
 
-			allocationPtr.clear();
-			int tLength = inputAccessor.getTupleEndOffset(i)
-					- inputAccessor.getTupleStartOffset(i);
-			memMgr.allocate(tLength, allocationPtr);
-			while (allocationPtr.isNull()) {
-				int unAllocSize = -1;
-				while (unAllocSize < tLength) {
-					unAllocSize = outputRecord();
-					if (unAllocSize < 1) {
-						throw new HyracksDataException(
-								"Unable to allocate space for the new tuple, while there is no more tuple to output");
-					}
-				}
-				memMgr.allocate(tLength, allocationPtr);
-			}
+            allocationPtr.clear();
+            int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
+            memMgr.allocate(tLength, allocationPtr);
+            while (allocationPtr.isNull()) {
+                int unAllocSize = -1;
+                while (unAllocSize < tLength) {
+                    unAllocSize = outputRecord();
+                    if (unAllocSize < 1) {
+                        throw new HyracksDataException(
+                                "Unable to allocate space for the new tuple, while there is no more tuple to output");
+                    }
+                }
+                memMgr.allocate(tLength, allocationPtr);
+            }
 
-			int pnk = getPNK(inputAccessor, i, bufferArray);
-			int runId = getRunId(inputAccessor, i);
-			if (runId != curRunId) { // tuple belongs to the next run
-				memMgr.writeTuple(allocationPtr.getFrameIx(),
-						allocationPtr.getOffset(), inputAccessor, i);
-				int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
-						allocationPtr.getOffset(), pnk };
-				sTree.insert(entry);
-				nextRunSize++;
-				continue;
-			}
-			// belongs to the current run
-			if (curRunSize < outputLimit) {
-				memMgr.writeTuple(allocationPtr.getFrameIx(),
-						allocationPtr.getOffset(), inputAccessor, i);
-				int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
-						allocationPtr.getOffset(), pnk };
-				sTree.insert(entry);
-				curRunSize++;
-				continue;
-			}
+            int pnk = getPNK(inputAccessor, i, bufferArray);
+            int runId = getRunId(inputAccessor, i);
+            if (runId != curRunId) { // tuple belongs to the next run
+                memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+                int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+                sTree.insert(entry);
+                nextRunSize++;
+                continue;
+            }
+            // belongs to the current run
+            if (curRunSize < outputLimit) {
+                memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+                int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+                sTree.insert(entry);
+                curRunSize++;
+                continue;
+            }
 
-			peek = sTree.peekMax();
-			if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
-					peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
-				continue;
-			}
-			// replacing the max
-			peek = sTree.getMax();
-			discard.set(peek[SortMinMaxHeap.FRAME_IX],
-					peek[SortMinMaxHeap.OFFSET_IX]);
-			memMgr.unallocate(discard);
-			memMgr.writeTuple(allocationPtr.getFrameIx(),
-					allocationPtr.getOffset(), inputAccessor, i);
-			int[] entry = new int[] { runId, allocationPtr.getFrameIx(),
-					allocationPtr.getOffset(), pnk };
-			sTree.insert(entry);
-		}
-	}
+            sTree.peekMax(peek);
+            if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
+                continue;
+            }
+            // replacing the max
+            sTree.getMax(peek);
+            discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
+            memMgr.unallocate(discard);
+            memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
+            int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
+            sTree.insert(entry);
+        }
+    }
 
-	@Override
-	public void fail() throws HyracksDataException {
-	}
+    @Override
+    public void fail() throws HyracksDataException {
+    }
 
-	@Override
-	public void close() throws HyracksDataException {
-		while (!sTree.isEmpty()) { // Outputting remaining elements in the
-									// selectionTree
-			outputRecordForClose();
-		}
+    @Override
+    public void close() throws HyracksDataException {
+        while (!sTree.isEmpty()) { // Outputting remaining elements in the
+                                   // selectionTree
+            outputRecordForClose();
+        }
 
-		if (outputAppender.getTupleCount() > 0) { // Writing out very last
-													// resident records to file
-			FrameUtils.flushFrame(outputBuffer, writer);
-		}
+        if (outputAppender.getTupleCount() > 0) { // Writing out very last
+                                                  // resident records to file
+            FrameUtils.flushFrame(outputBuffer, writer);
+        }
 
-		writer.close();
-		runs.add(writer.createReader());
-	}
+        writer.close();
+        runs.add(writer.createReader());
+    }
 
-	public List<IFrameReader> getRuns() {
-		return runs;
-	}
+    public List<IFrameReader> getRuns() {
+        return runs;
+    }
 
-	private int outputRecord() throws HyracksDataException {
-		outputedTuple.clear();
-		sTreeTop = sTree.getMin();
-		if (!isEntryValid(sTreeTop)) {
-			throw new HyracksDataException(
-					"Invalid outputed tuple (Top of the selection tree is invalid)");
-		}
-		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-		if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
-			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
-																		// not
-																		// append
-																		// to
-																		// the
-																		// tupleAppender
-				FrameUtils.flushFrame(outputBuffer, writer);
-				outputAppender.reset(outputBuffer, true);
-				if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-					throw new HyracksDataException(
-							"Can not append to the ouput buffer in sort");
-				}
-				lastTupleIx = 0;
-			} else {
-				lastTupleIx++;
-			}
-			outputedTuple.set(tFrameIx, tOffset);
-			newRun = false;
-			return memMgr.unallocate(outputedTuple);
-		}
-		// Minimum belongs to the next Run
-		openNewRun();
-		int popCount = curRunSize - outputLimit;
-		int l = 0;
-		int maxFreedSpace = 0;
-		for (int p = 0; p < popCount; p++) {
-			peek = sTree.getMax();
-			if (!isEntryValid(peek)) {
-				throw new HyracksDataException(
-						"Invalid Maximum extracted from MinMaxHeap");
-			}
-			discard.set(peek[SortMinMaxHeap.FRAME_IX],
-					peek[SortMinMaxHeap.OFFSET_IX]);
-			l = memMgr.unallocate(discard);
-			if (l > maxFreedSpace) {
-				maxFreedSpace = l;
-			}
-			curRunSize--;
-		}
+    private int outputRecord() throws HyracksDataException {
+        outputedTuple.clear();
+        sTree.getMin(sTreeTop);
+        if (!isEntryValid(sTreeTop)) {
+            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+        }
+        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+        if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
+            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
+                                                                        // not
+                                                                        // append
+                                                                        // to
+                                                                        // the
+                                                                        // tupleAppender
+                FrameUtils.flushFrame(outputBuffer, writer);
+                outputAppender.reset(outputBuffer, true);
+                if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+                    throw new HyracksDataException("Can not append to the ouput buffer in sort");
+                }
+                lastTupleIx = 0;
+            } else {
+                lastTupleIx++;
+            }
+            outputedTuple.set(tFrameIx, tOffset);
+            newRun = false;
+            return memMgr.unallocate(outputedTuple);
+        }
+        // Minimum belongs to the next Run
+        openNewRun();
+        int popCount = curRunSize - outputLimit;
+        int l = 0;
+        int maxFreedSpace = 0;
+        for (int p = 0; p < popCount; p++) {
+            sTree.getMax(peek);
+            if (!isEntryValid(peek)) {
+                throw new HyracksDataException("Invalid Maximum extracted from MinMaxHeap");
+            }
+            discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
+            l = memMgr.unallocate(discard);
+            if (l > maxFreedSpace) {
+                maxFreedSpace = l;
+            }
+            curRunSize--;
+        }
 
-		if (maxFreedSpace != 0) {
-			return maxFreedSpace;
-		}
-		// No max discarded (We just flushed out the prev run, so the output
-		// buffer should be clear
-		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-																	// append to
-																	// the
-																	// tupleAppender
-			throw new HyracksDataException(
-					"Can not append to the ouput buffer in sort");
-		}
-		lastTupleIx = 0;
-		outputedTuple.set(tFrameIx, tOffset);
-		newRun = false;
-		return memMgr.unallocate(outputedTuple);
-	}
+        if (maxFreedSpace != 0) {
+            return maxFreedSpace;
+        }
+        // No max discarded (We just flushed out the prev run, so the output
+        // buffer should be clear)
+        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+                                                                    // append to
+                                                                    // the
+                                                                    // tupleAppender
+            throw new HyracksDataException("Can not append to the ouput buffer in sort");
+        }
+        lastTupleIx = 0;
+        outputedTuple.set(tFrameIx, tOffset);
+        newRun = false;
+        return memMgr.unallocate(outputedTuple);
+    }
 
-	private void outputRecordForClose() throws HyracksDataException {
-		sTreeTop = sTree.getMin();
-		if (!isEntryValid(sTreeTop)) {
-			throw new HyracksDataException(
-					"Invalid outputed tuple (Top of the selection tree is invalid)");
-		}
-		int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-		int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-		if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
-			openNewRun();
-		}
+    private void outputRecordForClose() throws HyracksDataException {
+        sTree.getMin(sTreeTop);
+        if (!isEntryValid(sTreeTop)) {
+            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
+        }
+        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
+        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
+        if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
+            openNewRun();
+        }
 
-		if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-																	// append to
-																	// the
-																	// tupleAppender
-			FrameUtils.flushFrame(outputBuffer, writer);
-			outputAppender.reset(outputBuffer, true);
-			if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-				throw new HyracksDataException(
-						"Can not append to the ouput buffer in sort");
-			}
-		}
-	}
+        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
+                                                                    // append to
+                                                                    // the
+                                                                    // tupleAppender
+            FrameUtils.flushFrame(outputBuffer, writer);
+            outputAppender.reset(outputBuffer, true);
+            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
+                throw new HyracksDataException("Can not append to the ouput buffer in sort");
+            }
+        }
+    }
 
-	private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
-																				// buffInArray
-																				// out
-																				// for
-																				// better
-																				// performance
-																				// (not
-																				// converting
-																				// for
-																				// each
-																				// and
-																				// every
-																				// tuple)
-		int sfIdx = sortFields[0];
-		int tStart = fta.getTupleStartOffset(tIx);
-		int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
-		int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
-		int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
-		return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel
-				- f0StartRel));
-	}
+    private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) { // Moved
+                                                                              // buffInArray
+                                                                              // out
+                                                                              // for
+                                                                              // better
+                                                                              // performance
+                                                                              // (not
+                                                                              // converting
+                                                                              // for
+                                                                              // each
+                                                                              // and
+                                                                              // every
+                                                                              // tuple)
+        int sfIdx = sortFields[0];
+        int tStart = fta.getTupleStartOffset(tIx);
+        int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
+        int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
+        int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
+        return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
+    }
 
-	private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
-																// current
-																// record to
-																// last output
-																// record, it
-																// decided about
-																// current
-																// record's
-																// runId
-		if (newRun) { // Very first record for a new run
-			return curRunId;
-		}
+    private int getRunId(FrameTupleAccessor fta, int tupIx) { // Comparing
+                                                              // current
+                                                              // record to
+                                                              // last output
+                                                              // record, it
+                                                              // decides about
+                                                              // current
+                                                              // record's
+                                                              // runId
+        if (newRun) { // Very first record for a new run
+            return curRunId;
+        }
 
-		byte[] lastRecBuff = outputBuffer.array();
-		lastRecordAccessor.reset(outputBuffer);
-		int lastStartOffset = lastRecordAccessor
-				.getTupleStartOffset(lastTupleIx);
+        byte[] lastRecBuff = outputBuffer.array();
+        lastRecordAccessor.reset(outputBuffer);
+        int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
 
-		ByteBuffer fr2 = fta.getBuffer();
-		byte[] curRecBuff = fr2.array();
-		int r2StartOffset = fta.getTupleStartOffset(tupIx);
+        ByteBuffer fr2 = fta.getBuffer();
+        byte[] curRecBuff = fr2.array();
+        int r2StartOffset = fta.getTupleStartOffset(tupIx);
 
-		for (int f = 0; f < comparators.length; ++f) {
-			int fIdx = sortFields[f];
-			int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset
-					+ (fIdx - 1) * 4);
-			int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
-			int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength()
-					+ f1Start;
-			int l1 = f1End - f1Start;
-			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
-					* 4);
-			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-			int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
-			int l2 = f2End - f2Start;
-			int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2,
-					l2);
-			if (c != 0) {
-				if (c <= 0) {
-					return curRunId;
-				} else {
-					return (curRunId + 1);
-				}
-			}
-		}
-		return curRunId;
-	}
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
+            int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
+            int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+            int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
+            int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
+            if (c != 0) {
+                if (c <= 0) {
+                    return curRunId;
+                } else {
+                    return (curRunId + 1);
+                }
+            }
+        }
+        return curRunId;
+    }
 
-	// first<sec : -1
-	private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2,
-			int offset2) {
-		ByteBuffer buff1 = fta1.getBuffer();
-		byte[] recBuff1 = buff1.array();
-		int offset1 = fta1.getTupleStartOffset(ix1);
+    // first<sec : -1
+    private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2, int offset2) {
+        ByteBuffer buff1 = fta1.getBuffer();
+        byte[] recBuff1 = buff1.array();
+        int offset1 = fta1.getTupleStartOffset(ix1);
 
-		offset2 += BSTNodeUtil.HEADER_SIZE;
-		ByteBuffer buff2 = memMgr.getFrame(fix2);
-		fta2.reset(buff2);
-		byte[] recBuff2 = buff2.array();
+        offset2 += BSTNodeUtil.HEADER_SIZE;
+        ByteBuffer buff2 = memMgr.getFrame(fix2);
+        fta2.reset(buff2);
+        byte[] recBuff2 = buff2.array();
 
-		for (int f = 0; f < comparators.length; ++f) {
-			int fIdx = sortFields[f];
-			int f1Start = fIdx == 0 ? 0 : buff1
-					.getInt(offset1 + (fIdx - 1) * 4);
-			int f1End = buff1.getInt(offset1 + fIdx * 4);
-			int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
-			int l1 = f1End - f1Start;
-			int f2Start = fIdx == 0 ? 0 : buff2
-					.getInt(offset2 + (fIdx - 1) * 4);
-			int f2End = buff2.getInt(offset2 + fIdx * 4);
-			int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
-			int l2 = f2End - f2Start;
-			int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : buff1.getInt(offset1 + (fIdx - 1) * 4);
+            int f1End = buff1.getInt(offset1 + fIdx * 4);
+            int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : buff2.getInt(offset2 + (fIdx - 1) * 4);
+            int f2End = buff2.getInt(offset2 + fIdx * 4);
+            int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
+            int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
 
-			if (c != 0) {
-				return c;
-			}
-		}
-		return 0;
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
 
-	}
+    }
 
-	private void openNewRun() throws HyracksDataException {
-		if (writer != null) { // There is a prev run, so flush its tuples and
-								// close it first
-			if (outputAppender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(outputBuffer, writer);
-			}
-			outputAppender.reset(outputBuffer, true);
-			writer.close();
-			runs.add(writer.createReader());
-		}
+    private void openNewRun() throws HyracksDataException {
+        if (writer != null) { // There is a prev run, so flush its tuples and
+                              // close it first
+            if (outputAppender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outputBuffer, writer);
+            }
+            outputAppender.reset(outputBuffer, true);
+            writer.close();
+            runs.add(writer.createReader());
+        }
 
-		FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-				ExternalSortRunGenerator.class.getSimpleName());
-		writer = new RunFileWriter(file, ctx.getIOManager());
-		writer.open();
-		curRunId++;
-		newRun = true;
-		curRunSize = nextRunSize;
-		nextRunSize = 0;
-		lastTupleIx = -1;
-	}
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                ExternalSortRunGenerator.class.getSimpleName());
+        writer = new RunFileWriter(file, ctx.getIOManager());
+        writer.open();
+        curRunId++;
+        newRun = true;
+        curRunSize = nextRunSize;
+        nextRunSize = 0;
+        lastTupleIx = -1;
+    }
 
-	private boolean isEntryValid(int[] entry) {
-		return ((entry[SortMinHeap.RUN_ID_IX] > -1)
-				&& (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
-	}
+    private boolean isEntryValid(int[] entry) {
+        return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
index 67f3498..53da739 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunMerger.java
@@ -45,189 +45,184 @@
  *         find the top tuple at each iteration, among all the runs' heads in
  *         memory (check RunMergingFrameReader for more details). Otherwise,
  *         assuming that we have R runs and M memory buffers, where (R > M), we
- *         first merge first (M-1) runs and create a new sorted run. Discarding
- *         the first (M-1) runs, now merging step gets applied recursively on
- *         the (R-M+2) remaining runs using the M memory buffers.
+ *         first merge first (M-1) runs and create a new sorted run, out of
+ *         them. Discarding the first (M-1) runs, now merging procedure gets
+ *         applied recursively on the (R-M+2) remaining runs using the M memory
+ *         buffers.
  * 
- *         Merging also takes the outputLimit L, if specified, into account
- *         during merging. Once the final pass is done on the runs (which is the
- *         pass that generates the final sorted output), as soon as the output
- *         hits the limit L, the process stops, closes, and returns.
+ *         Merging also takes the outputLimit, if specified, into account during
+ *         merging. Once the final pass is done on the runs (which is the pass
+ *         that generates the final sorted output), as soon as the output size
+ *         hits the output limit, the process stops, closes, and returns.
  */
 
 public class OptimizedExternalSortRunMerger {
-	private final IHyracksTaskContext ctx;
-	private final List<IFrameReader> runs;
-	private final int[] sortFields;
-	private final IBinaryComparator[] comparators;
-	private final RecordDescriptor recordDesc;
-	private final int framesLimit;
-	private final IFrameWriter writer;
-	private List<ByteBuffer> inFrames;
-	private ByteBuffer outFrame;
-	private FrameTupleAppender outFrameAppender;
-	private FrameTupleAccessor outFrameAccessor;
-	private final int outputLimit;
-	private int currentSize;
+    private final IHyracksTaskContext ctx;
+    private final List<IFrameReader> runs;
+    private final int[] sortFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDesc;
+    private final int framesLimit;
+    private final IFrameWriter writer;
+    private List<ByteBuffer> inFrames;
+    private ByteBuffer outFrame;
+    private FrameTupleAppender outFrameAppender;
+    private FrameTupleAccessor outFrameAccessor;
+    private final int outputLimit;
+    private int currentSize;
 
-	public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx,
-			int outputLimit, List<IFrameReader> runs, int[] sortFields,
-			IBinaryComparator[] comparators, RecordDescriptor recordDesc,
-			int framesLimit, IFrameWriter writer) {
-		this.ctx = ctx;
-		this.runs = new LinkedList<IFrameReader>(runs);
-		this.sortFields = sortFields;
-		this.comparators = comparators;
-		this.recordDesc = recordDesc;
-		this.framesLimit = framesLimit;
-		this.writer = writer;
-		this.outputLimit = outputLimit;
-		this.currentSize = 0;
-	}
+    public OptimizedExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs,
+            int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
+            IFrameWriter writer) {
+        this.ctx = ctx;
+        this.runs = new LinkedList<IFrameReader>(runs);
+        this.sortFields = sortFields;
+        this.comparators = comparators;
+        this.recordDesc = recordDesc;
+        this.framesLimit = framesLimit;
+        this.writer = writer;
+        this.outputLimit = outputLimit;
+        this.currentSize = 0;
+    }
 
-	public void process() throws HyracksDataException {
-		writer.open();
+    public void process() throws HyracksDataException {
+        writer.open();
 
-		try {
-			outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-					recordDesc);
-			outFrame = ctx.allocateFrame();
-			outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
-			outFrameAppender.reset(outFrame, true);
+        try {
+            outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+            outFrame = ctx.allocateFrame();
+            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+            outFrameAppender.reset(outFrame, true);
 
-			if (runs.size() == 1) {
+            if (runs.size() == 1) {
 
-				if (outputLimit < 1) {
-					runs.get(0).open();
-					ByteBuffer nextFrame = ctx.allocateFrame();
-					while (runs.get(0).nextFrame(nextFrame)) {
-						FrameUtils.flushFrame(nextFrame, writer);
-						outFrameAppender.reset(nextFrame, true);
-					}
-					return;
-				}
+                if (outputLimit < 1) {
+                    runs.get(0).open();
+                    ByteBuffer nextFrame = ctx.allocateFrame();
+                    while (runs.get(0).nextFrame(nextFrame)) {
+                        FrameUtils.flushFrame(nextFrame, writer);
+                        outFrameAppender.reset(nextFrame, true);
+                    }
+                    return;
+                }
 
-				int totalCount = 0;
-				runs.get(0).open();
-				FrameTupleAccessor fta = new FrameTupleAccessor(
-						ctx.getFrameSize(), recordDesc);
-				ByteBuffer nextFrame = ctx.allocateFrame();
-				while (totalCount <= outputLimit
-						&& runs.get(0).nextFrame(nextFrame)) {
-					fta.reset(nextFrame);
-					int tupCount = fta.getTupleCount();
-					if ((totalCount + tupCount) < outputLimit) {
-						FrameUtils.flushFrame(nextFrame, writer);
-						totalCount += tupCount;
-						continue;
-					}
-					// The very last buffer, which exceeds the limit
-					int copyCount = outputLimit - totalCount;
-					outFrameAppender.reset(outFrame, true);
-					for (int i = 0; i < copyCount; i++) {
-						if (!outFrameAppender.append(fta, i)) {
-							throw new IllegalStateException();
-						}
-						totalCount++;
-					}
-				}
+                int totalCount = 0;
+                runs.get(0).open();
+                FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+                ByteBuffer nextFrame = ctx.allocateFrame();
+                while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
+                    fta.reset(nextFrame);
+                    int tupCount = fta.getTupleCount();
+                    if ((totalCount + tupCount) < outputLimit) {
+                        FrameUtils.flushFrame(nextFrame, writer);
+                        totalCount += tupCount;
+                        continue;
+                    }
+                    // The very last buffer, which exceeds the limit
+                    int copyCount = outputLimit - totalCount;
+                    outFrameAppender.reset(outFrame, true);
+                    for (int i = 0; i < copyCount; i++) {
+                        if (!outFrameAppender.append(fta, i)) {
+                            throw new IllegalStateException();
+                        }
+                        totalCount++;
+                    }
+                }
 
-				if (outFrameAppender.getTupleCount() > 0) {
-					FrameUtils.flushFrame(outFrame, writer);
-					outFrameAppender.reset(outFrame, true);
-				}
+                if (outFrameAppender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(outFrame, writer);
+                    outFrameAppender.reset(outFrame, true);
+                }
 
-				return;
-			}
+                return;
+            }
 
-			// More than one run, actual merging is needed
-			inFrames = new ArrayList<ByteBuffer>();
-			for (int i = 0; i < framesLimit - 1; ++i) {
-				inFrames.add(ctx.allocateFrame());
-			}
-			while (runs.size() > 0) {
-				try {
-					doPass(runs);
-				} catch (Exception e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            // More than one run, actual merging is needed
+            inFrames = new ArrayList<ByteBuffer>();
+            for (int i = 0; i < framesLimit - 1; ++i) {
+                inFrames.add(ctx.allocateFrame());
+            }
+            while (runs.size() > 0) {
+                try {
+                    doPass(runs);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-		} catch (Exception e) {
-			writer.fail();
-			throw new HyracksDataException(e);
-		} finally {
-			writer.close();
-		}
-	}
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
 
-	// creates a new run from runs that can fit in memory.
-	private void doPass(List<IFrameReader> runs) throws HyracksDataException {
-		FileReference newRun = null;
-		IFrameWriter writer = this.writer;
-		boolean finalPass = false;
-		if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
-			finalPass = true;
-			for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-				inFrames.remove(i);
-			}
-		} else {
-			newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
-					.getSimpleName());
-			writer = new RunFileWriter(newRun, ctx.getIOManager());
-			writer.open();
-		}
-		try {
-			IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
-			for (int i = 0; i < inFrames.size(); i++) {
-				runCursors[i] = runs.get(i);
-			}
-			RunMergingFrameReader merger = new RunMergingFrameReader(ctx,
-					runCursors, inFrames, sortFields, comparators, recordDesc);
-			merger.open();
-			try {
-				while (merger.nextFrame(outFrame)) {
-					if (outputLimit > 0 && finalPass) {
-						outFrameAccessor.reset(outFrame);
-						int count = outFrameAccessor.getTupleCount();
-						if ((currentSize + count) > outputLimit) {
-							ByteBuffer b = ctx.allocateFrame();
-							FrameTupleAppender partialAppender = new FrameTupleAppender(
-									ctx.getFrameSize());
-							partialAppender.reset(b, true);
-							int copyCount = outputLimit - currentSize;
-							for (int i = 0; i < copyCount; i++) {
-								partialAppender.append(outFrameAccessor, i);
-								currentSize++;
-							}
-							FrameUtils.makeReadable(b);
-							FrameUtils.flushFrame(b, writer);
-							break;
-						} else {
-							FrameUtils.flushFrame(outFrame, writer);
-							currentSize += count;
-						}
-					} else {
-						FrameUtils.flushFrame(outFrame, writer);
-					}
-				}
-			} finally {
-				merger.close();
-			}
+    // creates a new run from runs that can fit in memory.
+    private void doPass(List<IFrameReader> runs) throws HyracksDataException {
+        FileReference newRun = null;
+        IFrameWriter writer = this.writer;
+        boolean finalPass = false;
+        if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+            finalPass = true;
+            for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+                inFrames.remove(i);
+            }
+        } else {
+            newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+            writer = new RunFileWriter(newRun, ctx.getIOManager());
+            writer.open();
+        }
+        try {
+            IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+            for (int i = 0; i < inFrames.size(); i++) {
+                runCursors[i] = runs.get(i);
+            }
+            RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+                    comparators, recordDesc);
+            merger.open();
+            try {
+                while (merger.nextFrame(outFrame)) {
+                    if (outputLimit > 0 && finalPass) {
+                        outFrameAccessor.reset(outFrame);
+                        int count = outFrameAccessor.getTupleCount();
+                        if ((currentSize + count) > outputLimit) {
+                            ByteBuffer b = ctx.allocateFrame();
+                            FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
+                            partialAppender.reset(b, true);
+                            int copyCount = outputLimit - currentSize;
+                            for (int i = 0; i < copyCount; i++) {
+                                partialAppender.append(outFrameAccessor, i);
+                                currentSize++;
+                            }
+                            FrameUtils.makeReadable(b);
+                            FrameUtils.flushFrame(b, writer);
+                            break;
+                        } else {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            currentSize += count;
+                        }
+                    } else {
+                        FrameUtils.flushFrame(outFrame, writer);
+                    }
+                }
+            } finally {
+                merger.close();
+            }
 
-			if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
-				runs.clear();
-				return;
-			}
+            if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+                runs.clear();
+                return;
+            }
 
-			runs.subList(0, inFrames.size()).clear();
-			if (!finalPass) {
-				runs.add(0, ((RunFileWriter) writer).createReader());
-			}
-		} finally {
-			if (!finalPass) {
-				writer.close();
-			}
-		}
-	}
+            runs.subList(0, inFrames.size()).clear();
+            if (!finalPass) {
+                runs.add(0, ((RunFileWriter) writer).createReader());
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index e791582..c014ed1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -31,23 +31,14 @@
 
 public class RunMergingFrameReader implements IFrameReader {
     private final IHyracksTaskContext ctx;
-
     private final IFrameReader[] runCursors;
-
     private final List<ByteBuffer> inFrames;
-
     private final int[] sortFields;
-
     private final IBinaryComparator[] comparators;
-
     private final RecordDescriptor recordDesc;
-
     private final FrameTupleAppender outFrameAppender;
-
     private ReferencedPriorityQueue topTuples;
-
     private int[] tupleIndexes;
-
     private FrameTupleAccessor[] tupleAccessors;
 
     public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
index 85bcba2..2ceea17 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -1,65 +1,70 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 /**
- * @author pouria Defines a slot in the memory, which can be a free or allocated
+ * @author pouria
+ * 
+ *         Defines a slot in the memory, which can be a free or used (allocated)
  *         slot. Memory is a set of frames, ordered as a list. Each tuple is
  *         stored in a slot, where the location of the slot is denoted by a pair
- *         of integers: - The index of the frame, in the lits of frames in
- *         memory - The starting offset of the slot, within the the specific
- *         frame
+ *         of integers:
+ * 
+ *         - The index of the frame, in the list of frames in memory. (referred
+ *         to as frameIx)
+ * 
+ *         - The starting offset of the slot, within that specific frame.
+ *         (referred to as offset)
  */
 public class Slot {
 
-	private int frameIx;
-	private int offset;
+    private int frameIx;
+    private int offset;
 
-	public Slot() {
-		this.frameIx = BSTNodeUtil.INVALID_INDEX;
-		this.offset = BSTNodeUtil.INVALID_INDEX;
-	}
+    public Slot() {
+        this.frameIx = BSTNodeUtil.INVALID_INDEX;
+        this.offset = BSTNodeUtil.INVALID_INDEX;
+    }
 
-	public Slot(int frameIx, int offset) {
-		this.frameIx = frameIx;
-		this.offset = offset;
-	}
+    public Slot(int frameIx, int offset) {
+        this.frameIx = frameIx;
+        this.offset = offset;
+    }
 
-	public void set(int frameIx, int offset) {
-		this.frameIx = frameIx;
-		this.offset = offset;
-	}
+    public void set(int frameIx, int offset) {
+        this.frameIx = frameIx;
+        this.offset = offset;
+    }
 
-	public int getFrameIx() {
-		return frameIx;
-	}
+    public int getFrameIx() {
+        return frameIx;
+    }
 
-	public void setFrameIx(int frameIx) {
-		this.frameIx = frameIx;
-	}
+    public void setFrameIx(int frameIx) {
+        this.frameIx = frameIx;
+    }
 
-	public int getOffset() {
-		return offset;
-	}
+    public int getOffset() {
+        return offset;
+    }
 
-	public void setOffset(int offset) {
-		this.offset = offset;
-	}
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
 
-	public boolean isNull() {
-		return (frameIx == BSTNodeUtil.INVALID_INDEX)
-				|| (offset == BSTNodeUtil.INVALID_INDEX);
-	}
+    public boolean isNull() {
+        return (frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX);
+    }
 
-	public void clear() {
-		this.frameIx = BSTNodeUtil.INVALID_INDEX;
-		this.offset = BSTNodeUtil.INVALID_INDEX;
-	}
+    public void clear() {
+        this.frameIx = BSTNodeUtil.INVALID_INDEX;
+        this.offset = BSTNodeUtil.INVALID_INDEX;
+    }
 
-	public void copy(Slot s) {
-		this.frameIx = s.getFrameIx();
-		this.offset = s.getOffset();
-	}
+    public void copy(Slot s) {
+        this.frameIx = s.getFrameIx();
+        this.offset = s.getOffset();
+    }
 
-	public String toString() {
-		return "(" + frameIx + ", " + offset + ")";
-	}
+    public String toString() {
+        return "(" + frameIx + ", " + offset + ")";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
index d811de8..383fba8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -12,238 +12,245 @@
 
 /**
  * 
- * @author pouria Implements a minimum binary heap, used as selection tree, for
- *         sort with replacement. This heap structure can only be used as the
- *         min heap (no access to the max element) Two elements in the heap are
+ * @author pouria
+ * 
+ *         Implements a minimum binary heap, used as selection tree, for sort
+ *         with replacement. This heap structure can only be used as the min
+ *         heap (no access to the max element). Elements in the heap are
  *         compared based on their run numbers, and sorting key(s):
  * 
- *         Considering two elements A and B:
+ *         Considering two heap elements A and B:
  * 
  *         if RunNumber(A) > RunNumber(B) then A is larger than B if
- *         RunNumber(A) == RunNumber(B), then A is smaller than B, only if the
- *         value of the sort key(s) in B is greater than A (based on the sort
- *         comparator)
+ *         RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
+ *         if the value of the sort key(s) in B is greater than A (based on the
+ *         sort comparator).
  * 
  */
 public class SortMinHeap implements ISelectionTree {
 
-	static final int RUN_ID_IX = 0;
-	static final int FRAME_IX = 1;
-	static final int OFFSET_IX = 2;
-	final int PNK_IX = 3;
+    static final int RUN_ID_IX = 0;
+    static final int FRAME_IX = 1;
+    static final int OFFSET_IX = 2;
+    private static final int PNK_IX = 3;
 
-	private final int[] sortFields;
-	private final IBinaryComparator[] comparators;
-	private final RecordDescriptor recordDescriptor;
-	private final FrameTupleAccessor fta1;
-	private final FrameTupleAccessor fta2;
+    private final int[] sortFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final FrameTupleAccessor fta1;
+    private final FrameTupleAccessor fta2;
 
-	List<int[]> tree;
-	IMemoryManager memMgr;
+    List<int[]> tree;
+    IMemoryManager memMgr;
+    int[] top; // Used as the temp variable to access the top, to avoid object
+               // creation
 
-	public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDesc, IMemoryManager memMgr) {
-		this.sortFields = sortFields;
-		this.comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			this.comparators[i] = comparatorFactories[i]
-					.createBinaryComparator();
-		}
-		this.recordDescriptor = recordDesc;
-		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-		this.memMgr = memMgr;
+    public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, IMemoryManager memMgr) {
+        this.sortFields = sortFields;
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            this.comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDescriptor = recordDesc;
+        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        this.memMgr = memMgr;
 
-		this.tree = new ArrayList<int[]>();
-	}
+        this.tree = new ArrayList<int[]>();
+    }
 
-	/*
-	 * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
-	 */
-	@Override
-	public int[] getMin() {
-		return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
-	}
+    /*
+     * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
+     */
+    @Override
+    public void getMin(int[] result) {
+        if (tree.size() == 0) {
+            result[0] = result[1] = result[2] = result[3] = -1;
+            return;
+        }
 
-	@Override
-	public int[] peekMin() {
-		if (tree.size() == 0) {
-			return (new int[] { -1, -1, -1, -1 });
-		}
-		int[] top = tree.get(0);
-		return new int[] { top[0], top[1], top[2], top[3] };
-	}
+        top = delete(0);
+        for (int i = 0; i < top.length; i++) {
+            result[i] = top[i];
+        }
+    }
 
-	@Override
-	public void insert(int[] e) {
-		tree.add(e);
-		siftUp(tree.size() - 1);
-	}
+    @Override
+    public void peekMin(int[] result) {
+        if (tree.size() == 0) {
+            result[0] = result[1] = result[2] = result[3] = -1;
+            return;
+        }
 
-	@Override
-	public void reset() {
-		this.tree.clear();
-	}
+        top = tree.get(0);
+        for (int i = 0; i < top.length; i++) {
+            result[i] = top[i];
+        }
+    }
 
-	@Override
-	public boolean isEmpty() {
-		return (tree.size() < 1);
-	}
+    @Override
+    public void insert(int[] e) {
+        tree.add(e);
+        siftUp(tree.size() - 1);
+    }
 
-	public int _debugGetSize() {
-		return tree.size();
-	}
+    @Override
+    public void reset() {
+        this.tree.clear();
+    }
 
-	private int[] delete(int nix) {
-		int[] nv = tree.get(nix);
-		int[] last = tree.remove(tree.size() - 1);
+    @Override
+    public boolean isEmpty() {
+        return (tree.size() < 1);
+    }
 
-		if (tree.size() > 0) {
-			tree.set(nix, last);
-		} else {
-			return nv;
-		}
+    public int _debugGetSize() {
+        return tree.size();
+    }
 
-		int pIx = getParent(nix);
-		if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
-			siftUp(nix);
-		} else {
-			siftDown(nix);
-		}
-		return nv;
-	}
+    private int[] delete(int nix) {
+        int[] nv = tree.get(nix);
+        int[] last = tree.remove(tree.size() - 1);
 
-	private void siftUp(int nodeIx) {
-		int p = getParent(nodeIx);
-		if (p < 0) {
-			return;
-		}
-		while (p > -1 && (compare(nodeIx, p) < 0)) {
-			swap(p, nodeIx);
-			nodeIx = p;
-			p = getParent(nodeIx);
-			if (p < 0) { // We are at the root
-				return;
-			}
-		}
-	}
+        if (tree.size() > 0) {
+            tree.set(nix, last);
+        } else {
+            return nv;
+        }
 
-	private void siftDown(int nodeIx) {
-		int mix = getMinOfChildren(nodeIx);
-		if (mix < 0) {
-			return;
-		}
-		while (mix > -1 && (compare(mix, nodeIx) < 0)) {
-			swap(mix, nodeIx);
-			nodeIx = mix;
-			mix = getMinOfChildren(nodeIx);
-			if (mix < 0) { // We hit the leaf level
-				return;
-			}
-		}
-	}
+        int pIx = getParent(nix);
+        if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+            siftUp(nix);
+        } else {
+            siftDown(nix);
+        }
+        return nv;
+    }
 
-	// first < sec : -1
-	private int compare(int nodeSIx1, int nodeSIx2) {
-		int[] n1 = tree.get(nodeSIx1);
-		int[] n2 = tree.get(nodeSIx2);
-		return (compare(n1, n2));
-	}
+    private void siftUp(int nodeIx) {
+        int p = getParent(nodeIx);
+        if (p < 0) {
+            return;
+        }
+        while (p > -1 && (compare(nodeIx, p) < 0)) {
+            swap(p, nodeIx);
+            nodeIx = p;
+            p = getParent(nodeIx);
+            if (p < 0) { // We are at the root
+                return;
+            }
+        }
+    }
 
-	// first < sec : -1
-	private int compare(int[] n1, int[] n2) {
-		// Compare Run Numbers
-		if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
-			return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
-		}
+    private void siftDown(int nodeIx) {
+        int mix = getMinOfChildren(nodeIx);
+        if (mix < 0) {
+            return;
+        }
+        while (mix > -1 && (compare(mix, nodeIx) < 0)) {
+            swap(mix, nodeIx);
+            nodeIx = mix;
+            mix = getMinOfChildren(nodeIx);
+            if (mix < 0) { // We hit the leaf level
+                return;
+            }
+        }
+    }
 
-		// Compare Poor man Normalized Keys
-		if (n1[PNK_IX] != n2[PNK_IX]) {
-			return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
-					: 1;
-		}
+    // first < sec : -1
+    private int compare(int nodeSIx1, int nodeSIx2) {
+        int[] n1 = tree.get(nodeSIx1);
+        int[] n2 = tree.get(nodeSIx2);
+        return (compare(n1, n2));
+    }
 
-		return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
-				n1[OFFSET_IX], n2[OFFSET_IX]);
-	}
+    // first < sec : -1
+    private int compare(int[] n1, int[] n2) {
+        // Compare Run Numbers
+        if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+            return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+        }
 
-	private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
-			int r2StartOffset) {
-		byte[] b1 = fr1.array();
-		byte[] b2 = fr2.array();
-		fta1.reset(fr1);
-		fta2.reset(fr2);
-		int headerLen = BSTNodeUtil.HEADER_SIZE;
-		r1StartOffset += headerLen;
-		r2StartOffset += headerLen;
-		for (int f = 0; f < comparators.length; ++f) {
-			int fIdx = sortFields[f];
-			int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
-					* 4);
-			int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
-			int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
-			int l1 = f1End - f1Start;
-			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
-					* 4);
-			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-			int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
-			int l2 = f2End - f2Start;
+        // Compare Poor man Normalized Keys
+        if (n1[PNK_IX] != n2[PNK_IX]) {
+            return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
+        }
 
-			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+        return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
+    }
 
-			if (c != 0) {
-				return c;
-			}
-		}
-		return 0;
-	}
+    private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
+        byte[] b1 = fr1.array();
+        byte[] b2 = fr2.array();
+        fta1.reset(fr1);
+        fta2.reset(fr2);
+        int headerLen = BSTNodeUtil.HEADER_SIZE;
+        r1StartOffset += headerLen;
+        r2StartOffset += headerLen;
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
+            int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+            int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+            int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
 
-	private int getMinOfChildren(int nix) { // returns index of min child
-		int lix = getLeftChild(nix);
-		if (lix < 0) {
-			return -1;
-		}
-		int rix = getRightChild(nix);
-		if (rix < 0) {
-			return lix;
-		}
-		return ((compare(lix, rix) < 0) ? lix : rix);
-	}
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
 
-	private void swap(int n1Ix, int n2Ix) {
-		int[] temp = tree.get(n1Ix);
-		tree.set(n1Ix, tree.get(n2Ix));
-		tree.set(n2Ix, temp);
-	}
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
 
-	private int getLeftChild(int ix) {
-		int lix = 2 * ix + 1;
-		return ((lix < tree.size()) ? lix : -1);
-	}
+    private int getMinOfChildren(int nix) { // returns index of min child
+        int lix = getLeftChild(nix);
+        if (lix < 0) {
+            return -1;
+        }
+        int rix = getRightChild(nix);
+        if (rix < 0) {
+            return lix;
+        }
+        return ((compare(lix, rix) < 0) ? lix : rix);
+    }
 
-	private int getRightChild(int ix) {
-		int rix = 2 * ix + 2;
-		return ((rix < tree.size()) ? rix : -1);
-	}
+    private void swap(int n1Ix, int n2Ix) {
+        int[] temp = tree.get(n1Ix);
+        tree.set(n1Ix, tree.get(n2Ix));
+        tree.set(n2Ix, temp);
+    }
 
-	private int getParent(int ix) {
-		return ((ix - 1) / 2);
-	}
+    private int getLeftChild(int ix) {
+        int lix = 2 * ix + 1;
+        return ((lix < tree.size()) ? lix : -1);
+    }
 
-	private ByteBuffer getFrame(int frameIx) {
-		return (memMgr.getFrame(frameIx));
-	}
+    private int getRightChild(int ix) {
+        int rix = 2 * ix + 2;
+        return ((rix < tree.size()) ? rix : -1);
+    }
 
-	@Override
-	public int[] getMax() {
-		System.err.println("getMax() method not implemented for Min Heap");
-		return null;
-	}
+    private int getParent(int ix) {
+        return ((ix - 1) / 2);
+    }
 
-	@Override
-	public int[] peekMax() {
-		System.err.println("peekMax() method not implemented for Min Heap");
-		return null;
-	}
+    private ByteBuffer getFrame(int frameIx) {
+        return (memMgr.getFrame(frameIx));
+    }
+
+    @Override
+    public void getMax(int[] result) {
+        throw new IllegalStateException("getMax() method not applicable to Min Heap");
+    }
+
+    @Override
+    public void peekMax(int[] result) {
+        throw new IllegalStateException("getMax() method not applicable to Min Heap");
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
index ab66245..c040ccc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -18,377 +18,402 @@
  *         elements.
  */
 public class SortMinMaxHeap implements ISelectionTree {
-	static final int RUN_ID_IX = 0;
-	static final int FRAME_IX = 1;
-	static final int OFFSET_IX = 2;
-	final int PNK_IX = 3;
+    static final int RUN_ID_IX = 0;
+    static final int FRAME_IX = 1;
+    static final int OFFSET_IX = 2;
+    private static final int PNK_IX = 3;
+    private static final int NOT_EXIST = -1;
 
-	static final int NOT_EXIST = -1;
+    private final int[] sortFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDescriptor;
+    private final FrameTupleAccessor fta1;
+    private final FrameTupleAccessor fta2;
 
-	private final int[] sortFields;
-	private final IBinaryComparator[] comparators;
-	private final RecordDescriptor recordDescriptor;
-	private final FrameTupleAccessor fta1;
-	private final FrameTupleAccessor fta2;
+    List<int[]> tree;
+    IMemoryManager memMgr;
 
-	List<int[]> tree;
-	IMemoryManager memMgr;
+    int[] top; // Used as the temp variable to access the top, to avoid object
+               // creation
 
-	public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			RecordDescriptor recordDesc, IMemoryManager memMgr) {
-		this.sortFields = sortFields;
-		this.comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			this.comparators[i] = comparatorFactories[i]
-					.createBinaryComparator();
-		}
-		this.recordDescriptor = recordDesc;
-		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-		this.memMgr = memMgr;
+    public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, IMemoryManager memMgr) {
+        this.sortFields = sortFields;
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            this.comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDescriptor = recordDesc;
+        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        this.memMgr = memMgr;
 
-		this.tree = new ArrayList<int[]>();
-	}
+        this.tree = new ArrayList<int[]>();
+    }
 
-	@Override
-	public void insert(int[] element) {
-		tree.add(element);
-		bubbleUp(tree.size() - 1);
-	}
+    @Override
+    public void insert(int[] element) {
+        tree.add(element);
+        bubbleUp(tree.size() - 1);
+    }
 
-	@Override
-	public int[] getMin() {
-		return (tree.size() > 0 ? delete(0) : new int[] { -1, -1, -1, -1 });
-	}
+    @Override
+    public void getMin(int[] result) {
+        if (tree.size() == 0) {
+            result[0] = result[1] = result[2] = result[3] = -1;
+            return;
+        }
 
-	@Override
-	public void reset() {
-		this.tree.clear();
-	}
+        top = delete(0);
+        for (int i = 0; i < top.length; i++) {
+            result[i] = top[i];
+        }
+    }
 
-	@Override
-	public boolean isEmpty() {
-		return (tree.size() < 1);
-	}
+    @Override
+    public void reset() {
+        this.tree.clear();
+    }
 
-	@Override
-	public int[] peekMin() {
-		if (tree.size() == 0) {
-			return (new int[] { -1, -1, -1, -1 });
-		}
-		int[] top = tree.get(0);
-		return new int[] { top[0], top[1], top[2], top[3] };
-	}
+    @Override
+    public boolean isEmpty() {
+        return (tree.size() < 1);
+    }
 
-	@Override
-	public int[] getMax() {
-		if (tree.size() == 1) {
-			return tree.remove(0);
-		}
-		if (tree.size() > 1) {
-			int lc = getLeftChild(0);
-			int rc = getRightChild(0);
-			if (rc == -1) {
-				return delete(lc);
-			}
-			return (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
-		}
-		return new int[] { -1, -1, -1, -1 };
-	}
+    @Override
+    public void peekMin(int[] result) {
+        if (tree.size() == 0) {
+            result[0] = result[1] = result[2] = result[3] = -1;
+            return;
+        }
 
-	@Override
-	public int[] peekMax() {
-		if (tree.size() == 1) {
-			int[] t = tree.get(0);
-			return new int[] { t[0], t[1], t[2], t[3] };
-		}
-		if (tree.size() > 1) {
-			int lc = getLeftChild(0);
-			int rc = getRightChild(0);
-			if (rc == -1) {
-				int[] t = tree.get(lc);
-				return new int[] { t[0], t[1], t[2], t[3] };
-			}
-			int[] t = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
-			return new int[] { t[0], t[1], t[2], t[3] };
-		}
-		return new int[] { -1, -1, -1, -1 };
-	}
+        top = tree.get(0);
+        for (int i = 0; i < top.length; i++) {
+            result[i] = top[i];
+        }
+    }
 
-	private int[] delete(int delIx) {
-		int s = tree.size();
-		if (s > 1) {
-			int[] delEntry = tree.get(delIx);
-			int[] last = (tree.remove(s - 1));
-			if (delIx != tree.size()) {
-				tree.set(delIx, last);
-				trickleDown(delIx);
-			}
-			return delEntry;
-		} else if (s == 1) {
-			return tree.remove(0);
-		}
-		return null;
-	}
+    @Override
+    public void getMax(int[] result) {
+        if (tree.size() == 1) {
+            top = tree.remove(0);
+            for (int i = 0; i < top.length; i++) {
+                result[i] = top[i];
+            }
+            return;
+        }
 
-	private void bubbleUp(int ix) {
-		int p = getParentIx(ix);
-		if (isAtMinLevel(ix)) {
-			if (p != NOT_EXIST && compare(p, ix) < 0) {
-				swap(ix, p);
-				bubbleUpMax(p);
-			} else {
-				bubbleUpMin(ix);
-			}
-		} else { // i is at max level
-			if (p != NOT_EXIST && compare(ix, p) < 0) {
-				swap(ix, p);
-				bubbleUpMin(p);
-			} else {
-				bubbleUpMax(ix);
-			}
-		}
-	}
+        if (tree.size() > 1) {
+            int lc = getLeftChild(0);
+            int rc = getRightChild(0);
 
-	private void bubbleUpMax(int ix) {
-		int gp = getGrandParent(ix);
-		if (gp != NOT_EXIST && compare(gp, ix) < 0) {
-			swap(ix, gp);
-			bubbleUpMax(gp);
-		}
-	}
+            if (rc == -1) {
+                top = delete(lc);
+            } else {
+                top = (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+            }
 
-	private void bubbleUpMin(int ix) {
-		int gp = getGrandParent(ix);
-		if (gp != NOT_EXIST && compare(ix, gp) < 0) {
-			swap(ix, gp);
-			bubbleUpMin(gp);
-		}
-	}
+            for (int i = 0; i < top.length; i++) {
+                result[i] = top[i];
+            }
+            return;
 
-	private void trickleDown(int ix) {
-		if (isAtMinLevel(ix)) {
-			trickleDownMin(ix);
-		} else {
-			trickleDownMax(ix);
-		}
-	}
+        }
+        result[0] = result[1] = result[2] = result[3] = -1;
+    }
 
-	private void trickleDownMax(int ix) {
-		int maxIx = getMaxOfDescendents(ix);
-		if (maxIx == NOT_EXIST) {
-			return;
-		}
-		if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
-																		// children
-			if (compare(ix, maxIx) < 0) {
-				swap(maxIx, ix);
-				int p = getParentIx(maxIx);
-				if (p != NOT_EXIST && compare(maxIx, p) < 0) {
-					swap(maxIx, p);
-				}
-				trickleDownMax(maxIx);
-			}
-		} else { // A children
-			if (compare(ix, maxIx) < 0) {
-				swap(ix, maxIx);
-			}
-		}
-	}
+    @Override
+    public void peekMax(int[] result) {
+        if (tree.size() == 1) {
+            top = tree.get(0);
+            for (int i = 0; i < top.length; i++) {
+                result[i] = top[i];
+            }
+            return;
+        }
+        if (tree.size() > 1) {
+            int lc = getLeftChild(0);
+            int rc = getRightChild(0);
 
-	private void trickleDownMin(int ix) {
-		int minIx = getMinOfDescendents(ix);
-		if (minIx == NOT_EXIST) {
-			return;
-		}
-		if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
-																		// children
-			if (compare(minIx, ix) < 0) {
-				swap(minIx, ix);
-				int p = getParentIx(minIx);
-				if (p != NOT_EXIST && compare(p, minIx) < 0) {
-					swap(minIx, p);
-				}
-				trickleDownMin(minIx);
-			}
-		} else { // A children
-			if (compare(minIx, ix) < 0) {
-				swap(ix, minIx);
-			}
-		}
-	}
+            if (rc == -1) {
+                top = tree.get(lc);
+            } else {
+                top = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+            }
 
-	// Min among children and grand children
-	private int getMinOfDescendents(int ix) {
-		int lc = getLeftChild(ix);
-		if (lc == NOT_EXIST) {
-			return NOT_EXIST;
-		}
-		int rc = getRightChild(ix);
-		if (rc == NOT_EXIST) {
-			return lc;
-		}
-		int min = (compare(lc, rc) < 0) ? lc : rc;
-		int[] lgc = getLeftGrandChildren(ix);
-		int[] rgc = getRightGrandChildren(ix);
-		for (int k = 0; k < 2; k++) {
-			if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
-				min = lgc[k];
-			}
-			if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
-				min = rgc[k];
-			}
-		}
-		return min;
-	}
+            for (int i = 0; i < top.length; i++) {
+                result[i] = top[i];
+            }
+            return;
+        }
+        result[0] = result[1] = result[2] = result[3] = -1;
+    }
 
-	// Max among children and grand children
-	private int getMaxOfDescendents(int ix) {
-		int lc = getLeftChild(ix);
-		if (lc == NOT_EXIST) {
-			return NOT_EXIST;
-		}
-		int rc = getRightChild(ix);
-		if (rc == NOT_EXIST) {
-			return lc;
-		}
-		int max = (compare(lc, rc) < 0) ? rc : lc;
-		int[] lgc = getLeftGrandChildren(ix);
-		int[] rgc = getRightGrandChildren(ix);
-		for (int k = 0; k < 2; k++) {
-			if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
-				max = lgc[k];
-			}
-			if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
-				max = rgc[k];
-			}
-		}
-		return max;
-	}
+    private int[] delete(int delIx) {
+        int s = tree.size();
+        if (s > 1) {
+            int[] delEntry = tree.get(delIx);
+            int[] last = (tree.remove(s - 1));
+            if (delIx != tree.size()) {
+                tree.set(delIx, last);
+                trickleDown(delIx);
+            }
+            return delEntry;
+        } else if (s == 1) {
+            return tree.remove(0);
+        }
+        return null;
+    }
 
-	private void swap(int n1Ix, int n2Ix) {
-		int[] temp = tree.get(n1Ix);
-		tree.set(n1Ix, tree.get(n2Ix));
-		tree.set(n2Ix, temp);
-	}
+    private void bubbleUp(int ix) {
+        int p = getParentIx(ix);
+        if (isAtMinLevel(ix)) {
+            if (p != NOT_EXIST && compare(p, ix) < 0) {
+                swap(ix, p);
+                bubbleUpMax(p);
+            } else {
+                bubbleUpMin(ix);
+            }
+        } else { // i is at max level
+            if (p != NOT_EXIST && compare(ix, p) < 0) {
+                swap(ix, p);
+                bubbleUpMin(p);
+            } else {
+                bubbleUpMax(ix);
+            }
+        }
+    }
 
-	private int getParentIx(int i) {
-		if (i == 0) {
-			return NOT_EXIST;
-		}
-		return (i - 1) / 2;
-	}
+    private void bubbleUpMax(int ix) {
+        int gp = getGrandParent(ix);
+        if (gp != NOT_EXIST && compare(gp, ix) < 0) {
+            swap(ix, gp);
+            bubbleUpMax(gp);
+        }
+    }
 
-	private int getGrandParent(int i) {
-		int p = getParentIx(i);
-		return p != -1 ? getParentIx(p) : NOT_EXIST;
-	}
+    private void bubbleUpMin(int ix) {
+        int gp = getGrandParent(ix);
+        if (gp != NOT_EXIST && compare(ix, gp) < 0) {
+            swap(ix, gp);
+            bubbleUpMin(gp);
+        }
+    }
 
-	private int getLeftChild(int i) {
-		int lc = 2 * i + 1;
-		return lc < tree.size() ? lc : NOT_EXIST;
-	}
+    private void trickleDown(int ix) {
+        if (isAtMinLevel(ix)) {
+            trickleDownMin(ix);
+        } else {
+            trickleDownMax(ix);
+        }
+    }
 
-	private int[] getLeftGrandChildren(int i) {
-		int lc = getLeftChild(i);
-		return lc != NOT_EXIST ? new int[] { getLeftChild(lc),
-				getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
-	}
+    private void trickleDownMax(int ix) {
+        int maxIx = getMaxOfDescendents(ix);
+        if (maxIx == NOT_EXIST) {
+            return;
+        }
+        if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
+                                                                     // children
+            if (compare(ix, maxIx) < 0) {
+                swap(maxIx, ix);
+                int p = getParentIx(maxIx);
+                if (p != NOT_EXIST && compare(maxIx, p) < 0) {
+                    swap(maxIx, p);
+                }
+                trickleDownMax(maxIx);
+            }
+        } else { // A children
+            if (compare(ix, maxIx) < 0) {
+                swap(ix, maxIx);
+            }
+        }
+    }
 
-	private int getRightChild(int i) {
-		int rc = 2 * i + 2;
-		return rc < tree.size() ? rc : -1;
-	}
+    private void trickleDownMin(int ix) {
+        int minIx = getMinOfDescendents(ix);
+        if (minIx == NOT_EXIST) {
+            return;
+        }
+        if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
+                                                                     // children
+            if (compare(minIx, ix) < 0) {
+                swap(minIx, ix);
+                int p = getParentIx(minIx);
+                if (p != NOT_EXIST && compare(p, minIx) < 0) {
+                    swap(minIx, p);
+                }
+                trickleDownMin(minIx);
+            }
+        } else { // A children
+            if (compare(minIx, ix) < 0) {
+                swap(ix, minIx);
+            }
+        }
+    }
 
-	private int[] getRightGrandChildren(int i) {
-		int rc = getRightChild(i);
-		return rc != NOT_EXIST ? new int[] { getLeftChild(rc),
-				getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
-	}
+    // Min among children and grand children
+    private int getMinOfDescendents(int ix) {
+        int lc = getLeftChild(ix);
+        if (lc == NOT_EXIST) {
+            return NOT_EXIST;
+        }
+        int rc = getRightChild(ix);
+        if (rc == NOT_EXIST) {
+            return lc;
+        }
+        int min = (compare(lc, rc) < 0) ? lc : rc;
+        int[] lgc = getLeftGrandChildren(ix);
+        int[] rgc = getRightGrandChildren(ix);
+        for (int k = 0; k < 2; k++) {
+            if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
+                min = lgc[k];
+            }
+            if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
+                min = rgc[k];
+            }
+        }
+        return min;
+    }
 
-	private boolean isAtMinLevel(int i) {
-		int l = getLevel(i);
-		return l % 2 == 0 ? true : false;
-	}
+    // Max among children and grand children
+    private int getMaxOfDescendents(int ix) {
+        int lc = getLeftChild(ix);
+        if (lc == NOT_EXIST) {
+            return NOT_EXIST;
+        }
+        int rc = getRightChild(ix);
+        if (rc == NOT_EXIST) {
+            return lc;
+        }
+        int max = (compare(lc, rc) < 0) ? rc : lc;
+        int[] lgc = getLeftGrandChildren(ix);
+        int[] rgc = getRightGrandChildren(ix);
+        for (int k = 0; k < 2; k++) {
+            if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
+                max = lgc[k];
+            }
+            if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
+                max = rgc[k];
+            }
+        }
+        return max;
+    }
 
-	private int getLevel(int i) {
-		if (i == 0) {
-			return 0;
-		}
-		int l = (int) Math.floor(Math.log(i) / Math.log(2));
-		if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
-			return (l + 1);
-		}
-		return l;
-	}
+    private void swap(int n1Ix, int n2Ix) {
+        int[] temp = tree.get(n1Ix);
+        tree.set(n1Ix, tree.get(n2Ix));
+        tree.set(n2Ix, temp);
+    }
 
-	private ByteBuffer getFrame(int frameIx) {
-		return (memMgr.getFrame(frameIx));
-	}
+    private int getParentIx(int i) {
+        if (i == 0) {
+            return NOT_EXIST;
+        }
+        return (i - 1) / 2;
+    }
 
-	// first < sec : -1
-	private int compare(int nodeSIx1, int nodeSIx2) {
-		int[] n1 = tree.get(nodeSIx1);
-		int[] n2 = tree.get(nodeSIx2);
-		return (compare(n1, n2));
-	}
+    private int getGrandParent(int i) {
+        int p = getParentIx(i);
+        return p != -1 ? getParentIx(p) : NOT_EXIST;
+    }
 
-	// first < sec : -1
-	private int compare(int[] n1, int[] n2) {
-		// Compare Run Numbers
-		if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
-			return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
-		}
+    private int getLeftChild(int i) {
+        int lc = 2 * i + 1;
+        return lc < tree.size() ? lc : NOT_EXIST;
+    }
 
-		// Compare Poor man Normalized Keys
-		if (n1[PNK_IX] != n2[PNK_IX]) {
-			return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1
-					: 1;
-		}
+    private int[] getLeftGrandChildren(int i) {
+        int lc = getLeftChild(i);
+        return lc != NOT_EXIST ? new int[] { getLeftChild(lc), getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
+    }
 
-		return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]),
-				n1[OFFSET_IX], n2[OFFSET_IX]);
-	}
+    private int getRightChild(int i) {
+        int rc = 2 * i + 2;
+        return rc < tree.size() ? rc : -1;
+    }
 
-	private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset,
-			int r2StartOffset) {
-		byte[] b1 = fr1.array();
-		byte[] b2 = fr2.array();
-		fta1.reset(fr1);
-		fta2.reset(fr2);
-		int headerLen = BSTNodeUtil.HEADER_SIZE;
-		r1StartOffset += headerLen;
-		r2StartOffset += headerLen;
-		for (int f = 0; f < comparators.length; ++f) {
-			int fIdx = sortFields[f];
-			int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1)
-					* 4);
-			int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
-			int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
-			int l1 = f1End - f1Start;
-			int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1)
-					* 4);
-			int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-			int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
-			int l2 = f2End - f2Start;
+    private int[] getRightGrandChildren(int i) {
+        int rc = getRightChild(i);
+        return rc != NOT_EXIST ? new int[] { getLeftChild(rc), getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
+    }
 
-			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+    private boolean isAtMinLevel(int i) {
+        int l = getLevel(i);
+        return l % 2 == 0 ? true : false;
+    }
 
-			if (c != 0) {
-				return c;
-			}
-		}
-		return 0;
-	}
+    private int getLevel(int i) {
+        if (i == 0) {
+            return 0;
+        }
+        int l = (int) Math.floor(Math.log(i) / Math.log(2));
+        if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+            return (l + 1);
+        }
+        return l;
+    }
 
-	public String _debugPrintTree() {
-		String s = "";
-		for (int i = 0; i < tree.size(); i++) {
-			int[] n = tree.get(i);
-			s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", "
-					+ n[OFFSET_IX] + "), ";
-		}
-		return s;
-	}
+    private ByteBuffer getFrame(int frameIx) {
+        return (memMgr.getFrame(frameIx));
+    }
+
+    // first < sec : -1
+    private int compare(int nodeSIx1, int nodeSIx2) {
+        int[] n1 = tree.get(nodeSIx1);
+        int[] n2 = tree.get(nodeSIx2);
+        return (compare(n1, n2));
+    }
+
+    // first < sec : -1
+    private int compare(int[] n1, int[] n2) {
+        // Compare Run Numbers
+        if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
+            return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
+        }
+
+        // Compare Poor man Normalized Keys
+        if (n1[PNK_IX] != n2[PNK_IX]) {
+            return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
+        }
+
+        return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
+    }
+
+    private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset) {
+        byte[] b1 = fr1.array();
+        byte[] b2 = fr2.array();
+        fta1.reset(fr1);
+        fta2.reset(fr2);
+        int headerLen = BSTNodeUtil.HEADER_SIZE;
+        r1StartOffset += headerLen;
+        r2StartOffset += headerLen;
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
+            int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
+            int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
+            int l1 = f1End - f1Start;
+            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
+            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
+            int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
+            int l2 = f2End - f2Start;
+
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    public String _debugPrintTree() {
+        String s = "";
+        for (int i = 0; i < tree.size(); i++) {
+            int[] n = tree.get(i);
+            s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", " + n[OFFSET_IX] + "), ";
+        }
+        return s;
+    }
 }
\ No newline at end of file