Addressing Reviewer's comments

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@927 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 8f2bb81..2905574 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -34,7 +34,7 @@
     private final int INVALID_BUFFER = -2;
     private final int UNALLOCATED_FRAME = -3;
     private final int BUFFER_FOR_RESIDENT_PARTS = -1;
-
+    
     private IHyracksTaskContext ctx;
 
     private final String rel0Name;
@@ -579,7 +579,7 @@
         return pStatus;
     }
 
-    public String _stats_debug_getStats() {
+    public String debugGetStats() {
         int numOfResidentPartitions = 0;
         int numOfSpilledPartitions = 0;
         double sumOfBuildSpilledSizes = 0;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index e6a7035..c6c899d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -83,7 +83,7 @@
  */
 
 public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-
+	
     private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
     private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index f9dda7f..7454523 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -9,7 +9,6 @@
 
 /**
  * @author pouria
- * 
  *         Implements Memory Manager based on creating Binary Search Tree (BST)
  *         while Free slot size is the key for the BST nodes. Each node in BST
  *         shows a class of free slots, while all the free slots within a class
@@ -18,7 +17,6 @@
  *         as a separate data structure, but the free slots in the memory are
  *         used to hold BST nodes. Each BST node has the logical structure,
  *         defined in the BSTNodeUtil class.
- * 
  */
 public class BSTMemMgr implements IMemoryManager {
 
@@ -28,22 +26,15 @@
     private ByteBuffer[] frames;
     private ByteBuffer convertBuffer;
     private Slot root;
-    private Slot result; // A reusable object to hold one node returned as
-                         // method result
-    private Slot insertSlot; // A reusable object to hold one node within insert
-                             // process
+    private Slot result; // A reusable object to hold one node returned as method result
+    private Slot insertSlot; // A reusable object to hold one node within insert process
+    private Slot lastLeftParent; //A reusable object for the search process
+    private Slot lastLeft; //A reusable object for the search process
+    private Slot parent; //A reusable object for the search process 
 
     private Slot[] parentRes;
     private int lastFrame;
 
-    // Variables used for debugging/performance testing
-    private int debugFreeSlots = 0;
-    private int debugTreeSize = 0;
-    private int debugTotalLookupSteps;
-    private int debugTotalLookupCounts;
-    private int debugDepthCounter;
-    private int debugMaxDepth;
-
     public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
         this.ctx = ctx;
         frameSize = ctx.getFrameSize();
@@ -53,9 +44,10 @@
         root = new Slot();
         insertSlot = new Slot();
         result = new Slot();
+        lastLeftParent = new Slot();
+        lastLeft = new Slot();
+        parent = new Slot();
         parentRes = new Slot[] { new Slot(), new Slot() };
-        debugTotalLookupCounts = 0;
-        debugTotalLookupSteps = 0;
     }
 
     /**
@@ -63,7 +55,6 @@
      */
     @Override
     public void allocate(int length, Slot result) throws HyracksDataException {
-        debugTotalLookupCounts++;
         search(length, parentRes);
         if (parentRes[1].isNull()) {
             addFrame(parentRes);
@@ -77,8 +68,7 @@
         if (shouldSplit(sl, acLen)) {
             int[] s = split(parentRes[1], parentRes[0], acLen);
             int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
-            insert(s[2], s[3], insertLen); // inserting second half of the split
-                                           // slot
+            insert(s[2], s[3], insertLen); // inserting second half of the split slot
             BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
             result.set(s[0], s[1]);
             return;
@@ -97,8 +87,7 @@
                 : BSTNodeUtil.INVALID_INDEX);
         int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
         int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
-        // Remember: next and prev memory slots have the same frame index as the
-        // unallocated slot
+        // Remember: next and prev memory slots have the same frame index as the unallocated slot
         if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
             int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
             removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
@@ -156,24 +145,12 @@
         return frames[frameIndex];
     }
 
-    public String debugGetAvgSearchPath() {
-        double avg = (((double) debugTotalLookupSteps) / ((double) debugTotalLookupCounts));
-        return "\nTotal allocation requests:\t" + debugTotalLookupCounts + "\nAvg Allocation Path Length:\t" + avg
-                + "\nMax BST Depth:\t" + debugMaxDepth;
-    }
-
-    public void debugDecLookupCount() {
-        debugTotalLookupCounts--;
-    }
-
     /**
-     * 
      * @param parentResult
      *            is the container passed by the caller to contain the results
      * @throws HyracksDataException
      */
     private void addFrame(Slot[] parentResult) throws HyracksDataException {
-        debugDepthCounter = 0;
         clear(parentResult);
         if ((lastFrame + 1) >= frames.length) {
             return;
@@ -192,13 +169,9 @@
         }
 
         while (!parentResult[1].isNull()) {
-            debugDepthCounter++;
             if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
                 append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
                 parentResult[1].set(lastFrame, 0);
-                if (debugDepthCounter > debugMaxDepth) {
-                    debugMaxDepth = debugDepthCounter;
-                }
                 return;
             }
             if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
@@ -208,9 +181,6 @@
                             frames);
                     parentResult[0].copy(parentResult[1]);
                     parentResult[1].set(lastFrame, 0);
-                    if (debugDepthCounter > debugMaxDepth) {
-                        debugMaxDepth = debugDepthCounter;
-                    }
                     return;
                 } else {
                     parentResult[0].copy(parentResult[1]);
@@ -224,9 +194,6 @@
                             frames);
                     parentResult[0].copy(parentResult[1]);
                     parentResult[1].set(lastFrame, 0);
-                    if (debugDepthCounter > debugMaxDepth) {
-                        debugMaxDepth = debugDepthCounter;
-                    }
                     return;
                 } else {
                     parentResult[0].copy(parentResult[1]);
@@ -239,7 +206,6 @@
     }
 
     private void insert(int fix, int off, int length) throws HyracksDataException {
-        debugDepthCounter = 0;
         BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
         initNewNode(fix, off);
 
@@ -251,13 +217,9 @@
         insertSlot.clear();
         insertSlot.copy(root);
         while (!insertSlot.isNull()) {
-            debugDepthCounter++;
             int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
             if (curSlotLen == length) {
                 append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
-                if (debugDepthCounter > debugMaxDepth) {
-                    debugMaxDepth = debugDepthCounter;
-                }
                 return;
             }
             if (length < curSlotLen) {
@@ -266,9 +228,6 @@
                 if (isNodeNull(leftChildFIx, leftChildOffset)) {
                     initNewNode(fix, off);
                     BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
-                    if (debugDepthCounter > debugMaxDepth) {
-                        debugMaxDepth = debugDepthCounter;
-                    }
                     return;
                 } else {
                     insertSlot.set(leftChildFIx, leftChildOffset);
@@ -279,9 +238,6 @@
                 if (isNodeNull(rightChildFIx, rightChildOffset)) {
                     initNewNode(fix, off);
                     BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
-                    if (debugDepthCounter > debugMaxDepth) {
-                        debugMaxDepth = debugDepthCounter;
-                    }
                     return;
                 } else {
                     insertSlot.set(rightChildFIx, rightChildOffset);
@@ -304,13 +260,12 @@
             return;
         }
 
-        Slot lastLeftParent = new Slot();
-        Slot lastLeft = new Slot();
-        Slot parent = new Slot();
+        lastLeftParent.clear();
+        lastLeft.clear();
+        parent.clear();
         result.copy(root);
 
         while (!result.isNull()) {
-            debugTotalLookupSteps++;
             if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
                 target[0].copy(parent);
                 target[1].copy(result);
@@ -658,16 +613,11 @@
     }
 
     public String debugPrintMemory() {
-        debugFreeSlots = 0;
         Slot s = new Slot(0, 0);
         if (s.isNull()) {
             return "memory:\tNull";
         }
 
-        if (BSTNodeUtil.isFree(0, 0, frames)) {
-            debugFreeSlots++;
-        }
-
         String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
         int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
         int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
@@ -678,9 +628,6 @@
         }
         s.set(nfix, noff);
         while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
-            if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
-                debugFreeSlots++;
-            }
             m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
             length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
                     convertBuffer));
@@ -694,16 +641,14 @@
             }
             s.set(nfix, noff);
         }
-        return m + "\nFree Slots:\t" + debugFreeSlots;
+        return m;
     }
 
     public String debugPrintTree() {
-        debugTreeSize = 0;
         Slot node = new Slot();
         node.copy(root);
         if (!node.isNull()) {
-            debugTreeSize++;
-            return debugPrintSubTree(node) + "\nTree Nodes:\t" + debugTreeSize;
+            return debugPrintSubTree(node);
         }
         return "Null";
     }
@@ -727,11 +672,9 @@
                 + debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
                 + debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ")  }\n";
         if (!isNodeNull(lfix, loff)) {
-            debugTreeSize++;
             s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
         }
         if (!isNodeNull(rfix, roff)) {
-            debugTreeSize++;
             s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
         }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
index 0294dc0..3a1a629 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -4,10 +4,8 @@
 
 /**
  * @author pouria
- * 
  *         Implements utility methods, used extensively and repeatedly within
  *         the BSTMemMgr.
- * 
  *         Mainly includes methods to set/get different types of pointers,
  *         required and accessed within BST traversal, along with the methods
  *         for setting/getting length/header/footer of free slots, which have
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 09984aa..98621f5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
@@ -26,14 +12,37 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 
+/**
+ * @author pouria
+ *         This class defines the logic for merging the run, generated during
+ *         the first phase of external sort (for both sorting without replacement
+ *         selection and with it). For the case with replacement selection, this
+ *         code also takes the limit on the output into account (if specified).
+ *         If number of input runs is less than the available memory frames,
+ *         then merging can be done in one pass, by allocating one buffer per
+ *         run, and one buffer as the output buffer. A priorityQueue is used to
+ *         find the top tuple at each iteration, among all the runs' heads in
+ *         memory (check RunMergingFrameReader for more details). Otherwise,
+ *         assuming that we have R runs and M memory buffers, where (R > M), we
+ *         first merge first (M-1) runs and create a new sorted run, out of
+ *         them. Discarding the first (M-1) runs, now merging procedure gets
+ *         applied recursively on the (R-M+2) remaining runs using the M memory
+ *         buffers.
+ *         For the case of replacement selection, if outputLimit is specified,
+ *         once the final pass is done on the runs (which is the pass
+ *         that generates the final sorted output), as soon as the output size
+ *         hits the output limit, the process stops, closes, and returns.
+ */
+
 public class ExternalSortRunMerger {
+
     private final IHyracksTaskContext ctx;
-    private final FrameSorter frameSorter;
     private final List<IFrameReader> runs;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
@@ -44,6 +53,12 @@
     private ByteBuffer outFrame;
     private FrameTupleAppender outFrameAppender;
 
+    private final FrameSorter frameSorter; //Used in External sort, no replacement selection
+    private FrameTupleAccessor outFrameAccessor; //Used in External sort, with replacement selection
+    private final int outputLimit; //Used in External sort, with replacement selection and limit on output size
+    private int currentSize; //Used in External sort, with replacement selection and limit on output size
+
+    //Constructor for external sort, no replacement selection
     public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
             int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
             IFrameWriter writer) {
@@ -55,6 +70,22 @@
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
+        this.outputLimit = -1;
+    }
+
+    //Constructor for external sort with replacement selection
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
+            IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
+        this.ctx = ctx;
+        this.runs = new LinkedList<IFrameReader>(runs);
+        this.sortFields = sortFields;
+        this.comparators = comparators;
+        this.recordDesc = recordDesc;
+        this.framesLimit = framesLimit;
+        this.writer = writer;
+        this.outputLimit = outputLimit;
+        this.currentSize = 0;
+        this.frameSorter = null;
     }
 
     public void process() throws HyracksDataException {
@@ -128,4 +159,145 @@
             }
         }
     }
-}
\ No newline at end of file
+
+    public void processWithReplacementSelection() throws HyracksDataException {
+        writer.open();
+
+        try {
+            outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+            outFrame = ctx.allocateFrame();
+            outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+            outFrameAppender.reset(outFrame, true);
+
+            if (runs.size() == 1) {
+
+                if (outputLimit < 1) {
+                    runs.get(0).open();
+                    ByteBuffer nextFrame = ctx.allocateFrame();
+                    while (runs.get(0).nextFrame(nextFrame)) {
+                        FrameUtils.flushFrame(nextFrame, writer);
+                        outFrameAppender.reset(nextFrame, true);
+                    }
+                    return;
+                }
+
+                int totalCount = 0;
+                runs.get(0).open();
+                FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+                ByteBuffer nextFrame = ctx.allocateFrame();
+                while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
+                    fta.reset(nextFrame);
+                    int tupCount = fta.getTupleCount();
+                    if ((totalCount + tupCount) < outputLimit) {
+                        FrameUtils.flushFrame(nextFrame, writer);
+                        totalCount += tupCount;
+                        continue;
+                    }
+                    // The very last buffer, which exceeds the limit
+                    int copyCount = outputLimit - totalCount;
+                    outFrameAppender.reset(outFrame, true);
+                    for (int i = 0; i < copyCount; i++) {
+                        if (!outFrameAppender.append(fta, i)) {
+                            throw new IllegalStateException();
+                        }
+                        totalCount++;
+                    }
+                }
+
+                if (outFrameAppender.getTupleCount() > 0) {
+                    FrameUtils.flushFrame(outFrame, writer);
+                    outFrameAppender.reset(outFrame, true);
+                }
+
+                return;
+            }
+
+            // More than one run, actual merging is needed
+            inFrames = new ArrayList<ByteBuffer>();
+            for (int i = 0; i < framesLimit - 1; ++i) {
+                inFrames.add(ctx.allocateFrame());
+            }
+            while (runs.size() > 0) {
+                try {
+                    doPassWithReplacementSelection(runs);
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+
+    // creates a new run from runs that can fit in memory.
+    private void doPassWithReplacementSelection(List<IFrameReader> runs) throws HyracksDataException {
+        FileReference newRun = null;
+        IFrameWriter writer = this.writer;
+        boolean finalPass = false;
+        if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+            finalPass = true;
+            for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+                inFrames.remove(i);
+            }
+        } else {
+            newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+            writer = new RunFileWriter(newRun, ctx.getIOManager());
+            writer.open();
+        }
+        try {
+            IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+            for (int i = 0; i < inFrames.size(); i++) {
+                runCursors[i] = runs.get(i);
+            }
+            RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+                    comparators, recordDesc);
+            merger.open();
+            try {
+                while (merger.nextFrame(outFrame)) {
+                    if (outputLimit > 0 && finalPass) {
+                        outFrameAccessor.reset(outFrame);
+                        int count = outFrameAccessor.getTupleCount();
+                        if ((currentSize + count) > outputLimit) {
+                            ByteBuffer b = ctx.allocateFrame();
+                            FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
+                            partialAppender.reset(b, true);
+                            int copyCount = outputLimit - currentSize;
+                            for (int i = 0; i < copyCount; i++) {
+                                partialAppender.append(outFrameAccessor, i);
+                                currentSize++;
+                            }
+                            FrameUtils.makeReadable(b);
+                            FrameUtils.flushFrame(b, writer);
+                            break;
+                        } else {
+                            FrameUtils.flushFrame(outFrame, writer);
+                            currentSize += count;
+                        }
+                    } else {
+                        FrameUtils.flushFrame(outFrame, writer);
+                    }
+                }
+            } finally {
+                merger.close();
+            }
+
+            if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+                runs.clear();
+                return;
+            }
+
+            runs.subList(0, inFrames.size()).clear();
+            if (!finalPass) {
+                runs.add(0, ((RunFileWriter) writer).createReader());
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+        }
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
index b45c5f9..1447fa6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -8,7 +8,6 @@
 
 /**
  * @author pouria
- * 
  *         Defines the required operations, needed for any memory manager, used
  *         in sorting with replacement selection, to manage the free spaces
  */
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 7568e26..69cf8b1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -7,7 +7,6 @@
 
 /**
  * @author pouria
- * 
  *         Interface for the Run Generator
  */
 public interface IRunGenerator extends IFrameWriter {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
index 73407f3..a7d6c08 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -2,7 +2,6 @@
 
 /**
  * @author pouria
- * 
  *         Defines the selection tree, used in sorting with replacement
  *         selection to manage the order of output tuples into the runs, during
  *         the run generation phase. This tree contains tuples, belonging to two
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index 2f1c94c..e283c33 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -29,16 +29,13 @@
 
 /**
  * @author pouria
- * 
  *         Operator descriptor for sorting with replacement, consisting of two
  *         phases:
- * 
  *         - Run Generation: Denoted by OptimizedSortActivity below, in which
  *         sort runs get generated from the input data. This phases uses the
  *         Selection Tree and Memory Manager to benefit from the replacement
  *         selection optimization, to create runs which are longer than the
  *         available memory size.
- * 
  *         - Merging: Denoted by MergeActivity below, in which runs (generated
  *         in the previous phase) get merged via a merger. Each run has a single
  *         buffer in memory, and a priority queue is used to select the top
@@ -191,10 +188,10 @@
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
 
-                    OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(ctx, outputLimit, runs,
-                            sortFields, comparators, recordDescriptors[0], memSize, writer);
+                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
+                            comparators, recordDescriptors[0], memSize, writer);
 
-                    merger.process();
+                    merger.processWithReplacementSelection();
 
                 }
             };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
index 1430e6f..8b9e4fe 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -21,41 +20,31 @@
 
 /**
  * @author pouria
- * 
  *         This class implements the run generator for sorting with replacement
  *         selection, where there is no limit on the output, i.e. the whole data
  *         should be sorted. A SortMinHeap is used as the selectionTree to
  *         decide the order of writing tuples into the runs, while memory
  *         manager is based on a binary search tree to allocate tuples in the
  *         memory.
- * 
  *         The overall process is as follows: - Read the input data frame by
  *         frame. For each tuple T in the current frame:
- * 
  *         - Try to allocate a memory slot for writing T along with the attached
  *         header/footer (for memory management purpose)
- * 
  *         - If T can not be allocated, try to output as many tuples, currently
  *         resident in memory, as needed so that a free slot, large enough to
  *         hold T, gets created. MinHeap decides about which tuple should be
  *         sent to the output at each step.
- * 
  *         - Write T into the memory
- * 
  *         - Calculate the runID of T (based on the last output tuple for the
  *         current run). It is either the current run or the next run. Also
  *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
  *         faster later.
- * 
  *         - Create a heap element for T, containing: its runID, the slot
  *         pointer to its memory location, and its PNK.
- * 
  *         - Insert the created heap element into the heap
- * 
  *         - Upon closing, write all the tuples, currently resident in memory,
  *         into their corresponding run(s). Again min heap decides about which
  *         tuple is the next for output.
- * 
  *         OptimizedSortOperatorDescriptor will merge the generated runs, to
  *         generate the final sorted output of the data.
  */
@@ -63,7 +52,7 @@
     private final IHyracksTaskContext ctx;
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
-    IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
     private final IBinaryComparator[] comparators;
     private final RecordDescriptor recordDescriptor;
     private final List<IFrameReader> runs;
@@ -90,7 +79,7 @@
                                 // the selectionTree to output
     private int[] sTreeTop;
 
-    RunFileWriter writer;
+    private RunFileWriter writer;
 
     private boolean newRun;
     private int curRunId;
@@ -148,7 +137,6 @@
                     }
                 }
                 memMgr.allocate(tLength, allocationPtr);
-                ((BSTMemMgr) memMgr).debugDecLookupCount();
             }
             memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
             int runId = getRunId(inputAccessor, i);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
index f69f676..50c9c9c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -21,47 +20,34 @@
 
 /**
  * @author pouria
- * 
  *         This class implements the run generator for sorting with replacement
  *         selection, where there is a limit on the output, i.e. we are looking
  *         for top-k tuples (first k smallest tuples w.r.t sorting keys).
- * 
  *         A SortMinMaxHeap is used as the selectionTree to decide the order of
  *         writing tuples into the runs, and also to prune tuples (if possible).
  *         Memory manager is based on a binary search tree and is used to
  *         allocate memory slots for tuples.
- * 
  *         The overall process is as follows (Assuming that the limit is K):
- * 
  *         - Read the input data frame by frame. For each tuple T in the current
  *         frame:
- * 
  *         - If currentRun R has reached the limit of K on the size, and (T >
  *         maximum tuple of R), then ignore T.
- * 
  *         - Otherwise, try to allocate a memory slot for writing T along with
  *         the attached header/footer (for memory management purpose)
- * 
  *         - If T can not be allocated, try to output as many tuples, currently
  *         resident in memory, as needed so that a free slot, large enough to
  *         hold T, gets created. MinMaxHeap decides about which tuple should be
  *         sent to the output at each step.
- * 
  *         - Write T into memory.
- * 
  *         - Calculate the runID of T (based on the last output tuple for the
  *         current run). It is either the current run or the next run. Also
  *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
  *         faster later.
- * 
  *         - Create an heap element for T, containing its runID, the slot ptr to
  *         its memory location, and its PNK.
- * 
  *         - If runID is the nextRun, insert the heap element into the heap, and
  *         increment the size of nextRun.
- * 
  *         - If runID is the currentRun, then:
- * 
  *         - If currentRun has not hit the limit of k, insert the element into
  *         the heap, and increase currentRun size. - Otherwise, currentRun has
  *         hit the limit of K, while T is less than the max. So discard the
@@ -69,10 +55,8 @@
  *         unallocating its memory location) and insert the heap element into
  *         the heap. No need to change the currentRun size as we are replacing
  *         an old element (the old max) with T.
- * 
  *         - Upon closing, write all the tuples, currently resident in memory,
  *         into their corresponding run(s).
- * 
  *         - Note that upon opening a new Run R, if size of R (based on stats)
  *         is S and (S > K), then (S-K) current maximum tuples of R (which are
  *         resident in memory) get discarded at the beginning. MinMax heap can
@@ -83,7 +67,7 @@
     private final IHyracksTaskContext ctx;
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
-    IBinaryComparatorFactory[] comparatorFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
     private final IBinaryComparator[] comparators;
     private final RecordDescriptor recordDescriptor;
     private final List<IFrameReader> runs;
@@ -114,7 +98,7 @@
     private Slot discard;
     private int[] sTreeTop;
     private int[] peek;
-    RunFileWriter writer;
+    private RunFileWriter writer;
     private boolean newRun;
     private int curRunId;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
index 2ceea17..aaa9049 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -2,15 +2,12 @@
 
 /**
  * @author pouria
- * 
  *         Defines a slot in the memory, which can be a free or used (allocated)
  *         slot. Memory is a set of frames, ordered as a list. Each tuple is
  *         stored in a slot, where the location of the slot is denoted by a pair
  *         of integers:
- * 
  *         - The index of the frame, in the list of frames in memory. (referred
  *         to as frameIx)
- * 
  *         - The starting offset of the slot, within that specific frame.
  *         (referred to as offset)
  */
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
index 383fba8..a221118 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -1,8 +1,7 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -11,21 +10,16 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 /**
- * 
  * @author pouria
- * 
  *         Implements a minimum binary heap, used as selection tree, for sort
  *         with replacement. This heap structure can only be used as the min
  *         heap (no access to the max element). Elements in the heap are
  *         compared based on their run numbers, and sorting key(s):
- * 
  *         Considering two heap elements A and B:
- * 
  *         if RunNumber(A) > RunNumber(B) then A is larger than B if
  *         RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
  *         if the value of the sort key(s) in B is greater than A (based on the
  *         sort comparator).
- * 
  */
 public class SortMinHeap implements ISelectionTree {
 
@@ -33,17 +27,18 @@
     static final int FRAME_IX = 1;
     static final int OFFSET_IX = 2;
     private static final int PNK_IX = 3;
+    private static final int ELEMENT_SIZE = 4;
+    private static final int INIT_ARRAY_SIZE = 512;
 
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
     private final RecordDescriptor recordDescriptor;
     private final FrameTupleAccessor fta1;
     private final FrameTupleAccessor fta2;
-
-    List<int[]> tree;
-    IMemoryManager memMgr;
-    int[] top; // Used as the temp variable to access the top, to avoid object
-               // creation
+    private int[] elements;
+    private int nextIx;
+    private final IMemoryManager memMgr;
+    private int[] top; // Used as a temp variable to access the top, to avoid object creation
 
     public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, IMemoryManager memMgr) {
@@ -56,8 +51,11 @@
         fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         this.memMgr = memMgr;
-
-        this.tree = new ArrayList<int[]>();
+        this.top = new int[ELEMENT_SIZE];
+        Arrays.fill(top, -1);
+        this.elements = new int[INIT_ARRAY_SIZE];
+        Arrays.fill(elements, -1);
+        this.nextIx = 0;
     }
 
     /*
@@ -65,7 +63,7 @@
      */
     @Override
     public void getMin(int[] result) {
-        if (tree.size() == 0) {
+        if (nextIx == 0) {
             result[0] = result[1] = result[2] = result[3] = -1;
             return;
         }
@@ -78,49 +76,56 @@
 
     @Override
     public void peekMin(int[] result) {
-        if (tree.size() == 0) {
+        if (nextIx == 0) {
             result[0] = result[1] = result[2] = result[3] = -1;
             return;
         }
-
-        top = tree.get(0);
-        for (int i = 0; i < top.length; i++) {
-            result[i] = top[i];
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            result[i] = elements[i];
         }
     }
 
     @Override
     public void insert(int[] e) {
-        tree.add(e);
-        siftUp(tree.size() - 1);
+        if (nextIx >= elements.length) {
+            elements = Arrays.copyOf(elements, elements.length * 2);
+        }
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            elements[nextIx + i] = e[i];
+        }
+        siftUp(nextIx);
+        nextIx += ELEMENT_SIZE;
+
     }
 
     @Override
     public void reset() {
-        this.tree.clear();
+        Arrays.fill(elements, -1);
+        nextIx = 0;
     }
 
     @Override
     public boolean isEmpty() {
-        return (tree.size() < 1);
+        return (nextIx < ELEMENT_SIZE);
     }
 
     public int _debugGetSize() {
-        return tree.size();
+        return (nextIx > 0 ? (nextIx - 1) / 4 : 0);
     }
 
     private int[] delete(int nix) {
-        int[] nv = tree.get(nix);
-        int[] last = tree.remove(tree.size() - 1);
+        int[] nv = Arrays.copyOfRange(elements, nix, nix + ELEMENT_SIZE);
+        int[] lastElem = removeLast();
 
-        if (tree.size() > 0) {
-            tree.set(nix, last);
-        } else {
+        if (nextIx == 0) {
             return nv;
         }
 
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            elements[nix + i] = lastElem[i];
+        }
         int pIx = getParent(nix);
-        if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+        if (pIx > -1 && (compare(lastElem, Arrays.copyOfRange(elements, pIx, pIx + ELEMENT_SIZE)) < 0)) {
             siftUp(nix);
         } else {
             siftDown(nix);
@@ -128,6 +133,16 @@
         return nv;
     }
 
+    private int[] removeLast() {
+        if (nextIx < ELEMENT_SIZE) { //this is the very last element
+            return new int[] { -1, -1, -1, -1 };
+        }
+        int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
+        Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
+        nextIx -= ELEMENT_SIZE;
+        return l;
+    }
+
     private void siftUp(int nodeIx) {
         int p = getParent(nodeIx);
         if (p < 0) {
@@ -160,8 +175,8 @@
 
     // first < sec : -1
     private int compare(int nodeSIx1, int nodeSIx2) {
-        int[] n1 = tree.get(nodeSIx1);
-        int[] n2 = tree.get(nodeSIx2);
+        int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE);
+        int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE);
         return (compare(n1, n2));
     }
 
@@ -220,24 +235,30 @@
         return ((compare(lix, rix) < 0) ? lix : rix);
     }
 
+    //Assumption: n1Ix and n2Ix are starting indices of two elements
     private void swap(int n1Ix, int n2Ix) {
-        int[] temp = tree.get(n1Ix);
-        tree.set(n1Ix, tree.get(n2Ix));
-        tree.set(n2Ix, temp);
+        int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            elements[n1Ix + i] = elements[n2Ix + i];
+            elements[n2Ix + i] = temp[i];
+        }
     }
 
     private int getLeftChild(int ix) {
-        int lix = 2 * ix + 1;
-        return ((lix < tree.size()) ? lix : -1);
+        int lix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + ELEMENT_SIZE;
+        return ((lix < nextIx) ? lix : -1);
     }
 
     private int getRightChild(int ix) {
-        int rix = 2 * ix + 2;
-        return ((rix < tree.size()) ? rix : -1);
+        int rix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
+        return ((rix < nextIx) ? rix : -1);
     }
 
     private int getParent(int ix) {
-        return ((ix - 1) / 2);
+        if (ix <= 0) {
+            return -1;
+        }
+        return ((ix - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
     }
 
     private ByteBuffer getFrame(int frameIx) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
index c040ccc..dcaf781 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -1,8 +1,7 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -12,7 +11,6 @@
 
 /**
  * @author pouria
- * 
  *         Implements a MinMax binary heap, used as the selection tree, in
  *         sorting with replacement. Check SortMinHeap for details on comparing
  *         elements.
@@ -23,6 +21,8 @@
     static final int OFFSET_IX = 2;
     private static final int PNK_IX = 3;
     private static final int NOT_EXIST = -1;
+    private static final int ELEMENT_SIZE = 4;
+    private static final int INIT_ARRAY_SIZE = 512;
 
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
@@ -30,11 +30,10 @@
     private final FrameTupleAccessor fta1;
     private final FrameTupleAccessor fta2;
 
-    List<int[]> tree;
-    IMemoryManager memMgr;
+    private int[] elements;
+    private int nextIx;
 
-    int[] top; // Used as the temp variable to access the top, to avoid object
-               // creation
+    private final IMemoryManager memMgr;
 
     public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, IMemoryManager memMgr) {
@@ -47,124 +46,143 @@
         fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         this.memMgr = memMgr;
-
-        this.tree = new ArrayList<int[]>();
+        this.elements = new int[INIT_ARRAY_SIZE];
+        Arrays.fill(elements, -1);
+        this.nextIx = 0;
     }
 
     @Override
     public void insert(int[] element) {
-        tree.add(element);
-        bubbleUp(tree.size() - 1);
+        if (nextIx >= elements.length) {
+            elements = Arrays.copyOf(elements, elements.length * 2);
+        }
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            elements[nextIx + i] = element[i];
+        }
+        nextIx += ELEMENT_SIZE;
+        bubbleUp(nextIx - ELEMENT_SIZE);
     }
 
     @Override
     public void getMin(int[] result) {
-        if (tree.size() == 0) {
+        if (nextIx == 0) {
             result[0] = result[1] = result[2] = result[3] = -1;
             return;
         }
 
-        top = delete(0);
-        for (int i = 0; i < top.length; i++) {
-            result[i] = top[i];
+        int[] topElem = delete(0);
+        for (int x = 0; x < ELEMENT_SIZE; x++) {
+            result[x] = topElem[x];
         }
     }
 
     @Override
     public void reset() {
-        this.tree.clear();
+        Arrays.fill(elements, -1);
+        nextIx = 0;
     }
 
     @Override
     public boolean isEmpty() {
-        return (tree.size() < 1);
+        return (nextIx < ELEMENT_SIZE);
     }
 
     @Override
     public void peekMin(int[] result) {
-        if (tree.size() == 0) {
+        if (nextIx == 0) {
             result[0] = result[1] = result[2] = result[3] = -1;
             return;
         }
 
-        top = tree.get(0);
-        for (int i = 0; i < top.length; i++) {
-            result[i] = top[i];
+        for (int x = 0; x < ELEMENT_SIZE; x++) {
+            result[x] = elements[x];
         }
     }
 
     @Override
     public void getMax(int[] result) {
-        if (tree.size() == 1) {
-            top = tree.remove(0);
-            for (int i = 0; i < top.length; i++) {
-                result[i] = top[i];
+        if (nextIx == ELEMENT_SIZE) {
+            int[] topElement = removeLast();
+            for (int x = 0; x < ELEMENT_SIZE; x++) {
+                result[x] = topElement[x];
             }
             return;
         }
 
-        if (tree.size() > 1) {
+        if (nextIx > ELEMENT_SIZE) {
             int lc = getLeftChild(0);
             int rc = getRightChild(0);
+            int maxIx = lc;
 
-            if (rc == -1) {
-                top = delete(lc);
-            } else {
-                top = (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+            if (rc != -1) {
+                maxIx = compare(lc, rc) < 0 ? rc : lc;
             }
 
-            for (int i = 0; i < top.length; i++) {
-                result[i] = top[i];
+            int[] maxElem = delete(maxIx);
+            for (int x = 0; x < ELEMENT_SIZE; x++) {
+                result[x] = maxElem[x];
             }
             return;
-
         }
+
         result[0] = result[1] = result[2] = result[3] = -1;
+
     }
 
     @Override
     public void peekMax(int[] result) {
-        if (tree.size() == 1) {
-            top = tree.get(0);
-            for (int i = 0; i < top.length; i++) {
-                result[i] = top[i];
+        if (nextIx == ELEMENT_SIZE) {
+            for (int i = 0; i < ELEMENT_SIZE; i++) {
+                result[i] = elements[i];
             }
             return;
         }
-        if (tree.size() > 1) {
+        if (nextIx > ELEMENT_SIZE) {
             int lc = getLeftChild(0);
             int rc = getRightChild(0);
+            int maxIx = lc;
 
-            if (rc == -1) {
-                top = tree.get(lc);
-            } else {
-                top = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+            if (rc != -1) {
+                maxIx = compare(lc, rc) < 0 ? rc : lc;
             }
 
-            for (int i = 0; i < top.length; i++) {
-                result[i] = top[i];
+            for (int x = 0; x < ELEMENT_SIZE; x++) {
+                result[x] = elements[maxIx + x];
             }
+
             return;
         }
         result[0] = result[1] = result[2] = result[3] = -1;
     }
 
     private int[] delete(int delIx) {
-        int s = tree.size();
-        if (s > 1) {
-            int[] delEntry = tree.get(delIx);
-            int[] last = (tree.remove(s - 1));
-            if (delIx != tree.size()) {
-                tree.set(delIx, last);
+        int s = nextIx;
+        if (nextIx > ELEMENT_SIZE) {
+            int[] delEntry = Arrays.copyOfRange(elements, delIx, delIx + ELEMENT_SIZE);
+            int[] last = removeLast();
+            if (delIx != (s - ELEMENT_SIZE)) {
+                for (int x = 0; x < ELEMENT_SIZE; x++) {
+                    elements[delIx + x] = last[x];
+                }
                 trickleDown(delIx);
             }
             return delEntry;
-        } else if (s == 1) {
-            return tree.remove(0);
+        } else if (nextIx == ELEMENT_SIZE) {
+            return (removeLast());
         }
         return null;
     }
 
+    private int[] removeLast() {
+        if (nextIx < ELEMENT_SIZE) { //this is the very last element
+            return new int[] { -1, -1, -1, -1 };
+        }
+        int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
+        Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
+        nextIx -= ELEMENT_SIZE;
+        return l;
+    }
+
     private void bubbleUp(int ix) {
         int p = getParentIx(ix);
         if (isAtMinLevel(ix)) {
@@ -301,16 +319,18 @@
     }
 
     private void swap(int n1Ix, int n2Ix) {
-        int[] temp = tree.get(n1Ix);
-        tree.set(n1Ix, tree.get(n2Ix));
-        tree.set(n2Ix, temp);
+        int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
+        for (int i = 0; i < ELEMENT_SIZE; i++) {
+            elements[n1Ix + i] = elements[n2Ix + i];
+            elements[n2Ix + i] = temp[i];
+        }
     }
 
     private int getParentIx(int i) {
-        if (i == 0) {
+        if (i < ELEMENT_SIZE) {
             return NOT_EXIST;
         }
-        return (i - 1) / 2;
+        return ((i - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
     }
 
     private int getGrandParent(int i) {
@@ -319,8 +339,8 @@
     }
 
     private int getLeftChild(int i) {
-        int lc = 2 * i + 1;
-        return lc < tree.size() ? lc : NOT_EXIST;
+        int lc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + ELEMENT_SIZE;
+        return (lc < nextIx ? lc : -1);
     }
 
     private int[] getLeftGrandChildren(int i) {
@@ -329,8 +349,8 @@
     }
 
     private int getRightChild(int i) {
-        int rc = 2 * i + 2;
-        return rc < tree.size() ? rc : -1;
+        int rc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
+        return (rc < nextIx ? rc : -1);
     }
 
     private int[] getRightGrandChildren(int i) {
@@ -344,11 +364,13 @@
     }
 
     private int getLevel(int i) {
-        if (i == 0) {
+        if (i < ELEMENT_SIZE) {
             return 0;
         }
-        int l = (int) Math.floor(Math.log(i) / Math.log(2));
-        if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+
+        int cnv = i / ELEMENT_SIZE;
+        int l = (int) Math.floor(Math.log(cnv) / Math.log(2));
+        if (cnv == (((int) Math.pow(2, (l + 1))) - 1)) {
             return (l + 1);
         }
         return l;
@@ -360,8 +382,8 @@
 
     // first < sec : -1
     private int compare(int nodeSIx1, int nodeSIx2) {
-        int[] n1 = tree.get(nodeSIx1);
-        int[] n2 = tree.get(nodeSIx2);
+        int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE); //tree.get(nodeSIx1);
+        int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE); //tree.get(nodeSIx2);
         return (compare(n1, n2));
     }
 
@@ -407,13 +429,4 @@
         }
         return 0;
     }
-
-    public String _debugPrintTree() {
-        String s = "";
-        for (int i = 0; i < tree.size(); i++) {
-            int[] n = tree.get(i);
-            s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", " + n[OFFSET_IX] + "), ";
-        }
-        return s;
-    }
 }
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index c7aa1a1..91214c8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -90,6 +90,8 @@
         runTest(spec);
     }
 	
+	
+	
 	@Test
     public void optimizedSortMergeTest02() throws Exception {
         JobSpecification spec = new JobSpecification();
@@ -113,7 +115,7 @@
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
         
-        int outputLimit = 15;
+        int outputLimit = 200;
         OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
                         UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
@@ -137,4 +139,5 @@
 
         runTest(spec);
     }
+    
 }
\ No newline at end of file