[ASTERIXDB-2552][RT] Refactor runs generator and merger

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Moved the writer and sorter out of the merger to allow
micro external sort to use the run generator and merger
plus minor clean-ups.

Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3363
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 364f1c7..cb2da4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
 
 /**
  * Group-by aggregation is pushed into multi-pass merge of external sort.
@@ -44,28 +43,23 @@
     private final RecordDescriptor inputRecordDesc;
     private final RecordDescriptor partialAggRecordDesc;
     private final RecordDescriptor outRecordDesc;
-
     private final int[] groupFields;
     private final IAggregatorDescriptorFactory mergeAggregatorFactory;
     private final IAggregatorDescriptorFactory partialAggregatorFactory;
     private final boolean localSide;
-
     private final int[] mergeSortFields;
     private final int[] mergeGroupFields;
     private final IBinaryComparator[] groupByComparators;
 
-    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<GeneratedRunFileReader> runs,
-            int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
-            RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
-            INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
+    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields,
+            RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc, RecordDescriptor outRecordDesc,
+            int framesLimit, int[] groupFields, INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
             boolean localStage) {
-        super(ctx, frameSorter, runs, comparators, nmk, partialAggRecordDesc, framesLimit, writer);
-
+        super(ctx, runs, comparators, nmk, partialAggRecordDesc, framesLimit);
         this.inputRecordDesc = inRecordDesc;
         this.partialAggRecordDesc = partialAggRecordDesc;
         this.outRecordDesc = outRecordDesc;
-
         this.groupFields = groupFields;
         this.mergeAggregatorFactory = aggregatorFactory;
         this.partialAggregatorFactory = partialAggregatorFactory;
@@ -93,11 +87,10 @@
     }
 
     @Override
-    protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+    public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
-        boolean outputPartial = false;
         return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregatorFactory, inputRecordDesc,
-                outRecordDesc, nextWriter, outputPartial);
+                outRecordDesc, nextWriter, false);
     }
 
     @Override
@@ -110,16 +103,14 @@
     protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
             throws HyracksDataException {
         IAggregatorDescriptorFactory aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
-        boolean outputPartial = true;
         return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory,
-                partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, outputPartial);
+                partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, true);
     }
 
     @Override
-    protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
-        boolean outputPartial = false;
+    public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, mergeAggregatorFactory,
-                partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial);
+                partialAggRecordDesc, outRecordDesc, nextWriter, false);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 23e47f0..7ca01a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -20,7 +20,6 @@
 
 import java.util.List;
 
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -37,7 +36,6 @@
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
@@ -158,13 +156,12 @@
 
             @Override
             protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
-                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
-                    List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
-                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
-                return new ExternalSortGroupByRunMerger(ctx, sorter, runs, sortFields,
+                    IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+                    IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortGroupByRunMerger(ctx, runs, sortFields,
                         recordDescProvider.getInputRecordDescriptor(new ActivityId(odId, SORT_ACTIVITY_ID), 0),
-                        partialAggRecordDesc, outputRecordDesc, necessaryFrames, writer, groupFields, nmkComputer,
-                        comparators, partialAggregatorFactory, mergeAggregatorFactory, !finalStage);
+                        partialAggRecordDesc, outputRecordDesc, necessaryFrames, groupFields, nmkComputer, comparators,
+                        partialAggregatorFactory, mergeAggregatorFactory, !finalStage);
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 0bead97..e860288 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -43,8 +43,6 @@
 public abstract class AbstractExternalSortRunMerger {
 
     protected final IHyracksTaskContext ctx;
-    protected final IFrameWriter writer;
-
     private final List<GeneratedRunFileReader> runs;
     private final BitSet currentGenerationRunAvailable;
     private final IBinaryComparator[] comparators;
@@ -54,136 +52,93 @@
     private final int topK;
     private List<GroupVSizeFrame> inFrames;
     private VSizeFrame outputFrame;
-    private ISorter sorter;
-
     private static final Logger LOGGER = LogManager.getLogger();
 
-    public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
+    public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
             IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
-            int framesLimit, IFrameWriter writer) {
-        this(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE, writer);
+            int framesLimit) {
+        this(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE);
     }
 
-    public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
+    AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
             IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
-            int framesLimit, int topK, IFrameWriter writer) {
+            int framesLimit, int topK) {
         this.ctx = ctx;
-        this.sorter = sorter;
         this.runs = new LinkedList<>(runs);
         this.currentGenerationRunAvailable = new BitSet(runs.size());
         this.comparators = comparators;
         this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
-        this.writer = writer;
         this.topK = topK;
     }
 
-    public void process() throws HyracksDataException {
-        IFrameWriter finalWriter = null;
+    public void process(IFrameWriter finalWriter) throws HyracksDataException {
         try {
-            if (runs.isEmpty()) {
-                finalWriter = prepareSkipMergingFinalResultWriter(writer);
-                finalWriter.open();
-                if (sorter != null) {
-                    try {
-                        if (sorter.hasRemaining()) {
-                            sorter.flush(finalWriter);
-                        }
-                    } finally {
-                        sorter.close();
-                    }
-                }
-            } else {
-                /** recycle sort buffer */
-                if (sorter != null) {
-                    sorter.close();
-                }
+            int maxMergeWidth = framesLimit - 1;
+            inFrames = new ArrayList<>(maxMergeWidth);
+            outputFrame = new VSizeFrame(ctx);
+            List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth);
+            int stop = runs.size();
+            currentGenerationRunAvailable.set(0, stop);
+            int numberOfPasses = 1;
+            while (true) {
+                int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
+                        currentGenerationRunAvailable, stop);
+                prepareFrames(unUsed, inFrames, partialRuns);
 
-                finalWriter = prepareFinalMergeResultWriter(writer);
-                finalWriter.open();
-
-                int maxMergeWidth = framesLimit - 1;
-
-                inFrames = new ArrayList<>(maxMergeWidth);
-                outputFrame = new VSizeFrame(ctx);
-                List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth);
-
-                int stop = runs.size();
-                currentGenerationRunAvailable.set(0, stop);
-                int numberOfPasses = 1;
-                while (true) {
-
-                    int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
-                            currentGenerationRunAvailable, stop);
-                    prepareFrames(unUsed, inFrames, partialRuns);
-
-                    if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
-                        GeneratedRunFileReader reader;
-                        if (partialRuns.size() == 1) {
-                            if (!currentGenerationRunAvailable.isEmpty()) {
-                                throw new HyracksDataException(
-                                        "The record is too big to put into the merging frame, please"
-                                                + " allocate more sorting memory");
-                            } else {
-                                reader = partialRuns.get(0);
-                            }
-
+                if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
+                    GeneratedRunFileReader reader;
+                    if (partialRuns.size() == 1) {
+                        if (!currentGenerationRunAvailable.isEmpty()) {
+                            throw new HyracksDataException("The record is too big to put into the merging frame, please"
+                                    + " allocate more sorting memory");
                         } else {
-                            RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
-                            IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
-
-                            try {
-                                mergeResultWriter.open();
-                                merge(mergeResultWriter, partialRuns);
-                            } catch (Throwable t) {
-                                mergeResultWriter.fail();
-                                throw t;
-                            } finally {
-                                mergeResultWriter.close();
-                            }
-                            reader = mergeFileWriter.createReader();
-                        }
-                        runs.add(reader);
-
-                        if (currentGenerationRunAvailable.isEmpty()) {
-                            numberOfPasses++;
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("generated runs:" + stop);
-                            }
-                            runs.subList(0, stop).clear();
-                            currentGenerationRunAvailable.clear();
-                            currentGenerationRunAvailable.set(0, runs.size());
-                            stop = runs.size();
+                            reader = partialRuns.get(0);
                         }
                     } else {
-                        if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("final runs: {}", stop);
-                            LOGGER.debug("number of passes: " + numberOfPasses);
+                        RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
+                        IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
+
+                        try {
+                            mergeResultWriter.open();
+                            merge(mergeResultWriter, partialRuns);
+                        } catch (Throwable t) {
+                            mergeResultWriter.fail();
+                            throw t;
+                        } finally {
+                            mergeResultWriter.close();
                         }
-                        merge(finalWriter, partialRuns);
-                        break;
+                        reader = mergeFileWriter.createReader();
                     }
-                }
-            }
-        } catch (Exception e) {
-            if (finalWriter != null) {
-                finalWriter.fail();
-            }
-            throw HyracksDataException.create(e);
-        } finally {
-            try {
-                if (finalWriter != null) {
-                    finalWriter.close();
-                }
-            } finally {
-                for (RunFileReader reader : runs) {
-                    try {
-                        reader.close(); // close is idempotent.
-                    } catch (Exception e) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.log(Level.WARN, e.getMessage(), e);
+                    runs.add(reader);
+
+                    if (currentGenerationRunAvailable.isEmpty()) {
+                        numberOfPasses++;
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug("generated runs:" + stop);
                         }
+                        runs.subList(0, stop).clear();
+                        currentGenerationRunAvailable.clear();
+                        currentGenerationRunAvailable.set(0, runs.size());
+                        stop = runs.size();
+                    }
+                } else {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("final runs: {}", stop);
+                        LOGGER.debug("number of passes: " + numberOfPasses);
+                    }
+                    merge(finalWriter, partialRuns);
+                    break;
+                }
+            }
+        } finally {
+            for (RunFileReader reader : runs) {
+                try {
+                    reader.close(); // close is idempotent.
+                } catch (Exception e) {
+                    if (LOGGER.isWarnEnabled()) {
+                        LOGGER.log(Level.WARN, e.getMessage(), e);
                     }
                 }
             }
@@ -237,18 +192,6 @@
         }
     }
 
-    protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
-            throws HyracksDataException;
-
-    protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
-
-    protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
-            throws HyracksDataException;
-
-    protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
-
-    protected abstract int[] getSortFields();
-
     private void merge(IFrameWriter writer, List<GeneratedRunFileReader> partialRuns) throws HyracksDataException {
         RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(),
                 comparators, nmkComputer, recordDesc, topK);
@@ -267,4 +210,16 @@
         }
     }
 
+    public abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+            throws HyracksDataException;
+
+    protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
+
+    protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+            throws HyracksDataException;
+
+    public abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
+
+    protected abstract int[] getSortFields();
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 980ad9b..74223d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -79,14 +79,6 @@
     private final BufferInfo info = new BufferInfo(null, -1, -1);
 
     public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
-            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
-            throws HyracksDataException {
-        this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
-                recordDescriptor, Integer.MAX_VALUE);
-    }
-
-    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
             int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
             throws HyracksDataException {
@@ -286,23 +278,10 @@
         return 0;
     }
 
-    protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
-        System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
-        System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
-        System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
-    }
-
-    protected void copy(int src[], int srcPos, int dest[], int destPos) {
-        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
-    }
-
-    protected void copy(int src[], int srcPos, int dest[], int destPos, int n) {
-        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
-    }
-
     @Override
     public void close() {
         tupleCount = 0;
+        totalMemoryUsed = 0;
         bufferManager.close();
         tPointers = null;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index 3c11669..3f0b7c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -28,13 +28,18 @@
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 
 public abstract class AbstractSortRunGenerator implements IRunGenerator {
-    protected final List<GeneratedRunFileReader> generatedRunFileReaders;
+
+    private final List<GeneratedRunFileReader> generatedRunFileReaders;
 
     public AbstractSortRunGenerator() {
         generatedRunFileReaders = new LinkedList<>();
     }
 
-    abstract public ISorter getSorter() throws HyracksDataException;
+    /**
+     * Null could be returned. Caller should check if it not null.
+     * @return the sorter associated with the run generator or null if there is no sorter.
+     */
+    abstract public ISorter getSorter();
 
     @Override
     public void open() throws HyracksDataException {
@@ -43,9 +48,10 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (getSorter().hasRemaining()) {
+        ISorter sorter = getSorter();
+        if (sorter != null && sorter.hasRemaining()) {
             if (generatedRunFileReaders.size() <= 0) {
-                getSorter().sort();
+                sorter.sort();
             } else {
                 flushFramesToRun();
             }
@@ -56,13 +62,15 @@
 
     abstract protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException;
 
-    protected void flushFramesToRun() throws HyracksDataException {
-        getSorter().sort();
+    // assumption is that there will always be a sorter (i.e. sorter is not null)
+    void flushFramesToRun() throws HyracksDataException {
+        ISorter sorter = getSorter();
+        sorter.sort();
         RunFileWriter runWriter = getRunFileWriter();
         IFrameWriter flushWriter = getFlushableFrameWriter(runWriter);
         flushWriter.open();
         try {
-            getSorter().flush(flushWriter);
+            sorter.flush(flushWriter);
         } catch (Exception e) {
             flushWriter.fail();
             throw e;
@@ -70,7 +78,7 @@
             flushWriter.close();
         }
         generatedRunFileReaders.add(runWriter.createDeleteOnCloseReader());
-        getSorter().reset();
+        sorter.reset();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 406703e..6abc064 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -49,12 +49,9 @@
 public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorDescriptor {
 
     private static final Logger LOGGER = LogManager.getLogger();
-
     private static final long serialVersionUID = 1L;
-
     protected static final int SORT_ACTIVITY_ID = 0;
     protected static final int MERGE_ACTIVITY_ID = 1;
-
     protected final int[] sortFields;
     protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
     protected final IBinaryComparatorFactory[] comparatorFactories;
@@ -90,10 +87,10 @@
     }
 
     public static class SortTaskState extends AbstractStateObject {
-        public List<GeneratedRunFileReader> generatedRunFileReaders;
-        public ISorter sorter;
+        List<GeneratedRunFileReader> generatedRunFileReaders;
+        ISorter sorter;
 
-        public SortTaskState(JobId jobId, TaskId taskId) {
+        SortTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
     }
@@ -101,7 +98,7 @@
     protected abstract class SortActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public SortActivity(ActivityId id) {
+        protected SortActivity(ActivityId id) {
             super(id);
         }
 
@@ -111,7 +108,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private AbstractSortRunGenerator runGen;
 
                 @Override
@@ -143,26 +140,24 @@
                     runGen.fail();
                 }
             };
-            return op;
         }
     }
 
     protected abstract class MergeActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public MergeActivity(ActivityId id) {
+        protected MergeActivity(ActivityId id) {
             super(id);
         }
 
         protected abstract AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
-                List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
-                int necessaryFrames);
+                IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+                IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames);
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
                 @Override
                 public void initialize() throws HyracksDataException {
@@ -176,13 +171,39 @@
                     }
                     INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null
                             : keyNormalizerFactories[0].createNormalizedKeyComputer();
-                    AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter,
-                            runs, comparators, nmkComputer, framesLimit);
-                    merger.process();
+                    AbstractExternalSortRunMerger merger =
+                            getSortRunMerger(ctx, recordDescProvider, runs, comparators, nmkComputer, framesLimit);
+                    IFrameWriter wrappingWriter = null;
+                    try {
+                        if (runs.isEmpty()) {
+                            wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer);
+                            wrappingWriter.open();
+                            if (sorter.hasRemaining()) {
+                                sorter.flush(wrappingWriter);
+                            }
+                        } else {
+                            // eagerly close the sorter here to release memory rather than in finally
+                            sorter.close();
+                            sorter = null;
+                            wrappingWriter = merger.prepareFinalMergeResultWriter(writer);
+                            wrappingWriter.open();
+                            merger.process(wrappingWriter);
+                        }
+                    } catch (Throwable e) {
+                        if (wrappingWriter != null) {
+                            wrappingWriter.fail();
+                        }
+                        throw HyracksDataException.create(e);
+                    } finally {
+                        if (sorter != null) {
+                            sorter.close();
+                        }
+                        if (wrappingWriter != null) {
+                            wrappingWriter.close();
+                        }
+                    }
                 }
             };
-            return op;
         }
     }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index b58d4c7..8b80a26 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -20,7 +20,6 @@
 
 import java.util.List;
 
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -91,12 +90,11 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
-                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
-                    List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
-                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
-                return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer,
-                        outRecDescs[0], necessaryFrames, outputLimit, writer);
+            protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+                    IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0],
+                        necessaryFrames, outputLimit);
             }
         };
     }
@@ -113,7 +111,7 @@
             RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
         super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
         if (framesLimit <= 1) {
-            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+            throw new IllegalStateException();// minimum of 2 frames (1 in,1 out)
         }
         this.alg = alg;
         this.policy = policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 2b985b9..fb32b0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -34,15 +34,15 @@
 
     private final int[] sortFields;
 
-    public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs,
-            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
-            RecordDescriptor recordDesc, int framesLimit, int topK, IFrameWriter writer) {
-        super(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, topK, writer);
+    public ExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields,
+            IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
+            int framesLimit, int topK) {
+        super(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, topK);
         this.sortFields = sortFields;
     }
 
     @Override
-    protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+    public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         return nextWriter;
     }
 
@@ -59,7 +59,7 @@
     }
 
     @Override
-    protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+    public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
         return nextWriter;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index 260b665..92b7d7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -116,4 +116,12 @@
             copy(tPointers, pos2, tPointersTemp, targetPos, rest);
         }
     }
+
+    private void copy(int src[], int srcPos, int dest[], int destPos) {
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize);
+    }
+
+    private void copy(int src[], int srcPos, int dest[], int destPos, int n) {
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index 486bc7c..ddef0d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -25,17 +25,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 
-public class FrameSorterQuickSort extends AbstractFrameSorter {
+class FrameSorterQuickSort extends AbstractFrameSorter {
 
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
-            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
-            throws HyracksDataException {
-        this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories,
-                recordDescriptor, Integer.MAX_VALUE);
-    }
-
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
+    FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames,
             int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit)
             throws HyracksDataException {
@@ -48,7 +40,7 @@
         sort(0, tupleCount);
     }
 
-    void sort(int offset, int length) throws HyracksDataException {
+    private void sort(int offset, int length) throws HyracksDataException {
         int m = offset + (length >> 1);
 
         int a = offset;
@@ -102,4 +94,9 @@
         }
     }
 
+    private void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+        System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+        System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
+        System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index 1578975..d8431f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -45,7 +45,7 @@
     protected final IBinaryComparatorFactory[] comparatorFactories;
     protected final RecordDescriptor recordDescriptor;
     protected ITupleSorter tupleSorter;
-    protected IFrameTupleAccessor inAccessor;
+    protected final IFrameTupleAccessor inAccessor;
 
     public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
             INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public ISorter getSorter() throws HyracksDataException {
+    public ISorter getSorter() {
         return tupleSorter;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 180ecbc..3b07017 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public ISorter getSorter() throws HyracksDataException {
+    public ISorter getSorter() {
         if (tupleSorter != null) {
             return tupleSorter;
         } else if (frameSorter != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index dea770a..b29057f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -21,7 +21,6 @@
 
 import java.util.List;
 
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -47,7 +46,7 @@
                 comparatorFactories, recordDescriptor);
     }
 
-    public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+    private TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
             INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor);
@@ -75,12 +74,11 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
-                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
-                    List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators,
-                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
-                return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer,
-                        outRecDescs[0], necessaryFrames, topK, writer);
+            protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs,
+                    IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0],
+                        necessaryFrames, topK);
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
index bcf661f..8fa570c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -57,10 +57,33 @@
                 }
                 INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null
                         : normalizedKeyComputerFactory.createNormalizedKeyComputer();
-                AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
-                        inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators,
+                AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, runs, keyFields,
+                        inRecordDesc, outputRec, outputRec, numFrames, keyFields, nmkComputer, comparators,
                         partialAggrInState, finalAggrInState, true);
-                merger.process();
+                IFrameWriter wrappingWriter = null;
+                try {
+                    if (runs.isEmpty()) {
+                        wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer);
+                        wrappingWriter.open();
+                        if (sorter.hasRemaining()) {
+                            sorter.flush(wrappingWriter);
+                        }
+                    } else {
+                        wrappingWriter = merger.prepareFinalMergeResultWriter(writer);
+                        wrappingWriter.open();
+                        merger.process(wrappingWriter);
+                    }
+                } catch (Throwable e) {
+                    if (wrappingWriter != null) {
+                        wrappingWriter.fail();
+                    }
+                    throw HyracksDataException.create(e);
+                } finally {
+                    sorter.close();
+                    if (wrappingWriter != null) {
+                        wrappingWriter.close();
+                    }
+                }
             }
         };
     }