Adding Optimization for the merge step in external sort

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@965 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 98621f5..59cf056 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -53,7 +53,7 @@
     private ByteBuffer outFrame;
     private FrameTupleAppender outFrameAppender;
 
-    private final FrameSorter frameSorter; //Used in External sort, no replacement selection
+    private FrameSorter frameSorter; //Used in External sort, no replacement selection
     private FrameTupleAccessor outFrameAccessor; //Used in External sort, with replacement selection
     private final int outputLimit; //Used in External sort, with replacement selection and limit on output size
     private int currentSize; //Used in External sort, with replacement selection and limit on output size
@@ -95,7 +95,15 @@
                 if (frameSorter != null && frameSorter.getFrameCount() > 0) {
                     frameSorter.flushFrames(writer);
                 }
+                /** recycle sort buffer */
+                frameSorter = null;
+                System.gc();
+
             } else {
+            	/** recycle sort buffer */
+                frameSorter = null;
+                System.gc();
+
                 inFrames = new ArrayList<ByteBuffer>();
                 outFrame = ctx.allocateFrame();
                 outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
@@ -162,15 +170,12 @@
 
     public void processWithReplacementSelection() throws HyracksDataException {
         writer.open();
-
         try {
             outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
             outFrame = ctx.allocateFrame();
             outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
             outFrameAppender.reset(outFrame, true);
-
             if (runs.size() == 1) {
-
                 if (outputLimit < 1) {
                     runs.get(0).open();
                     ByteBuffer nextFrame = ctx.allocateFrame();
@@ -178,9 +183,10 @@
                         FrameUtils.flushFrame(nextFrame, writer);
                         outFrameAppender.reset(nextFrame, true);
                     }
+                    System.gc();
                     return;
                 }
-
+                //Limit on the output size
                 int totalCount = 0;
                 runs.get(0).open();
                 FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
@@ -203,16 +209,15 @@
                         totalCount++;
                     }
                 }
-
                 if (outFrameAppender.getTupleCount() > 0) {
                     FrameUtils.flushFrame(outFrame, writer);
                     outFrameAppender.reset(outFrame, true);
                 }
-
+                System.gc();
                 return;
             }
-
             // More than one run, actual merging is needed
+            System.gc();
             inFrames = new ArrayList<ByteBuffer>();
             for (int i = 0; i < framesLimit - 1; ++i) {
                 inFrames.add(ctx.allocateFrame());
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index e283c33..6b73c54 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -187,9 +187,10 @@
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
                     }
-
+                    
+                    int necessaryFrames = Math.min(runs.size() + 2, memSize);
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
-                            comparators, recordDescriptors[0], memSize, writer);
+                            comparators, recordDescriptors[0], necessaryFrames, writer);
 
                     merger.processWithReplacementSelection();