Let a run file reader have an option to delete the run file after it is read.

Change-Id: Iabbd7c3e00489e2dbd8b1d4b87c7c9f81e8116b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/397
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <jianfeng.jia@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
index 5b1bc02..e930dab 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.io;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
@@ -39,7 +40,8 @@
         registry.registerDeallocatable(new IDeallocatable() {
             @Override
             public void deallocate() {
-                fRef.delete();
+                // Delete the created managed file.
+                FileUtils.deleteQuietly(fRef.getFile());
             }
         });
         return fRef;
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index 27760c1..71e1be9 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -63,11 +63,15 @@
   		<artifactId>hyracks-data-std</artifactId>
   		<version>0.2.17-SNAPSHOT</version>
     </dependency>
-   	<dependency>
+       <dependency>
   		<groupId>org.apache.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
   		<version>0.2.17-SNAPSHOT</version>
         <scope>test</scope>
-  	</dependency>
+    </dependency>
+    <dependency>
+         <groupId>commons-io</groupId>
+         <artifactId>commons-io</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index f5d4444..e1a2a89 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.dataflow.common.io;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
@@ -28,16 +29,17 @@
 
 public class RunFileReader implements IFrameReader {
     private final FileReference file;
+    private IFileHandle handle;
     private final IIOManager ioManager;
     private final long size;
-
-    private IFileHandle handle;
     private long readPtr;
+    private boolean deleteAfterClose;
 
-    public RunFileReader(FileReference file, IIOManager ioManager, long size) {
+    public RunFileReader(FileReference file, IIOManager ioManager, long size, boolean deleteAfterRead) {
         this.file = file;
         this.ioManager = ioManager;
         this.size = size;
+        this.deleteAfterClose = deleteAfterRead;
     }
 
     @Override
@@ -77,6 +79,9 @@
     @Override
     public void close() throws HyracksDataException {
         ioManager.close(handle);
+        if (deleteAfterClose) {
+            FileUtils.deleteQuietly(file.getFile());
+        }
     }
 
     public long getFileSize() {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 755e336..2af40fd 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -77,6 +77,13 @@
         if (failed) {
             throw new HyracksDataException("createReader() called on a failed RunFileWriter");
         }
-        return new RunFileReader(file, ioManager, size);
+        return new RunFileReader(file, ioManager, size, false);
+    }
+
+    public RunFileReader createDeleteOnCloseReader() throws HyracksDataException {
+        if (failed) {
+            throw new HyracksDataException("createReader() called on a failed RunFileWriter");
+        }
+        return new RunFileReader(file, ioManager, size, true);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 568a62d..fb81912 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -25,7 +25,6 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
@@ -124,14 +123,15 @@
         }
         List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
         for (int i = 0; i < runFileWriters.size(); i++) {
-            runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createReader(), runFileMaxFrameSize.get(i)));
+            runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createDeleteOnCloseReader(), runFileMaxFrameSize
+                    .get(i)));
         }
         RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
-        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
-                comparators, null, recordDescriptor, framesLimit, rfw);
+        ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
+                recordDescriptor, framesLimit, rfw);
         merger.process();
 
-        reader = rfw.createReader();
+        reader = rfw.createDeleteOnCloseReader();
         reader.open();
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index f3ee778..4311809 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -137,6 +137,6 @@
             writer.close();
         }
         gTable.reset();
-        state.getRuns().add(((RunFileWriter) writer).createReader());
+        state.getRuns().add(writer.createDeleteOnCloseReader());
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index fb56c0e..814e537 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -118,6 +118,7 @@
         this.outRecordDescriptor = outRecordDescriptor;
     }
 
+    @Override
     public void initialize() throws HyracksDataException {
         aggState = (ExternalGroupState) ctx.getStateObject(stateId);
         runs = aggState.getRuns();
@@ -281,7 +282,7 @@
              * file list
              */
             if (!finalPass) {
-                runs.add(0, ((RunFileWriter) writer).createReader());
+                runs.add(0, ((RunFileWriter) writer).createDeleteOnCloseReader());
             }
         } finally {
             if (!finalPass) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index d50d307..349cc5a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -127,7 +127,7 @@
 
                 // build
                 if (buildWriter != null) {
-                    RunFileReader buildReader = buildWriter.createReader();
+                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
                     buildReader.open();
                     while (buildReader.nextFrame(buffer)) {
                         ByteBuffer copyBuffer = ctx.allocateFrame(buffer.getFrameSize());
@@ -139,7 +139,7 @@
                 }
 
                 // probe
-                RunFileReader probeReader = probeWriter.createReader();
+                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
                 probeReader.open();
                 while (probeReader.nextFrame(buffer)) {
                     joiner.join(buffer.getBuffer(), writer);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index e115937..f72d528 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -80,8 +80,10 @@
 
     /**
      * @param spec
-     * @param memsize               in frames
-     * @param inputsize0            in frames
+     * @param memsize
+     *            in frames
+     * @param inputsize0
+     *            in frames
      * @param recordsPerFrame
      * @param factor
      * @param keys0
@@ -300,8 +302,8 @@
                         if (memsize > inputsize0) {
                             state.nPartitions = 0;
                         } else {
-                            state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
-                                    / (double) (memsize - 1)));
+                            state.nPartitions = (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize)
+                                    / (memsize - 1)));
                         }
                         if (state.nPartitions <= 0) {
                             // becomes in-memory HJ
@@ -324,10 +326,9 @@
                             .createPartitioner();
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                    state.joiner = new InMemoryHashJoin(ctx, tableSize,
-                            new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
-                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
-                            predEvaluator);
+                    state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
+                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
+                            isLeftOuter, nullWriters1, table, predEvaluator);
                     bufferForPartitions = new IFrame[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -432,7 +433,7 @@
                         int tupleCount0 = accessorProbe.getTupleCount();
                         for (int i = 0; i < tupleCount0; ++i) {
 
-                            int entry ;
+                            int entry;
                             if (state.memoryForHashtable == 0) {
                                 entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
                                 boolean newBuffer = false;
@@ -513,13 +514,12 @@
                                 continue;
                             }
                             table.reset();
-                            InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
-                                    rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
-                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1,
-                                    table, predEvaluator);
+                            InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0),
+                                    hpcRep0, new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0,
+                                            keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
 
                             if (buildWriter != null) {
-                                RunFileReader buildReader = buildWriter.createReader();
+                                RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
                                 buildReader.open();
                                 while (buildReader.nextFrame(inBuffer)) {
                                     ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
@@ -531,7 +531,7 @@
                             }
 
                             // probe
-                            RunFileReader probeReader = probeWriter.createReader();
+                            RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
                             probeReader.open();
                             while (probeReader.nextFrame(inBuffer)) {
                                 joiner.join(inBuffer.getBuffer(), writer);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index db7c308..5ca7700 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -55,12 +55,11 @@
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
-    private boolean isReversed;        //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+    private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
 
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
             ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
-            INullWriter[] nullWriters1)
-            throws HyracksDataException {
+            INullWriter[] nullWriters1) throws HyracksDataException {
         this.accessorInner = accessor1;
         this.accessorOuter = accessor0;
         this.appender = new FrameTupleAppender();
@@ -166,7 +165,7 @@
     }
 
     private boolean evaluatePredicate(int tIx1, int tIx2) {
-        if (isReversed) {        //Role Reversal Optimization is triggered
+        if (isReversed) { //Role Reversal Optimization is triggered
             return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
         } else {
             return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
@@ -194,7 +193,7 @@
     }
 
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        runFileReader = runFileWriter.createReader();
+        runFileReader = runFileWriter.createDeleteOnCloseReader();
         runFileReader.open();
         while (runFileReader.nextFrame(innerBuffer)) {
             for (int i = 0; i < currentMemSize; i++) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index e332ecf..58e1b29 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -185,17 +185,14 @@
         buildPSizeInFrames = new int[numOfPartitions];
         freeFramesCounter = memForJoin - numOfPartitions;
 
-        for (int i = 0; i
-                < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+        for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
             memBuffs[i] = new VSizeFrame(ctx);
             curPBuff[i] = i;
             nextBuff[i] = -1;
             buildPSizeInFrames[i] = 1; //The dedicated initial buffer
         }
 
-        nextFreeBuffIx = ((numOfPartitions < memForJoin) ?
-                numOfPartitions :
-                NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+        nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
         for (int i = numOfPartitions; i < memBuffs.length; i++) {
             nextBuff[i] = UNALLOCATED_FRAME;
         }
@@ -235,8 +232,7 @@
                 if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
                     int pidToSpill = selectPartitionToSpill();
                     if (pidToSpill == -1) { //No more partition to spill
-                        throw new HyracksDataException(
-                                "not enough memory for Hash Join (Allocation exceeds the limit)");
+                        throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
                     }
                     spillPartition(pidToSpill);
                     buildTupAppender.reset(memBuffs[pidToSpill], true);
@@ -258,7 +254,7 @@
                 if (buildTupAppender.append(accessorBuild, tid)) {
                     break;
                 }
-                //Dedicated in-memory buffer for the partition is full, needed to be flushed first 
+                //Dedicated in-memory buffer for the partition is full, needed to be flushed first
                 buildWrite(pid, partition.getBuffer());
                 partition.reset();
                 needClear = true;
@@ -355,8 +351,7 @@
         }
 
         ByteBuffer buff = null;
-        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus
-                .nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
             buff = memBuffs[i].getBuffer();
             accessorBuild.reset(buff);
             if (accessorBuild.getTupleCount() > 0) {
@@ -427,7 +422,7 @@
     }
 
     private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
-        RunFileReader r = wr.createReader();
+        RunFileReader r = wr.createDeleteOnCloseReader();
         r.open();
         int counter = 0;
         ByteBuffer mBuff = null;
@@ -469,9 +464,8 @@
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
         ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildHpc,
-                new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters1, table,
-                predEvaluator, isReversed);
+                new FrameTupleAccessor(buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
+                        comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
@@ -495,8 +489,8 @@
         }
         curPBuff = new int[numOfPartitions];
         int nextBuffIxToAlloc = 0;
-        /* We only need to allocate one frame per spilled partition. 
-         * Resident partitions do not need frames in probe, as their tuples join 
+        /* We only need to allocate one frame per spilled partition.
+         * Resident partitions do not need frames in probe, as their tuples join
          * immediately with the resident build tuples using the inMemoryHashJoin */
         for (int i = 0; i < numOfPartitions; i++) {
             curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
@@ -559,8 +553,7 @@
 
     }
 
-    public void closeProbe(IFrameWriter writer) throws
-            HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+    public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
         inMemJoiner.join(probeResBuff.getBuffer(), writer);
         inMemJoiner.closeJoin(writer);
 
@@ -593,7 +586,7 @@
     }
 
     public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
-        return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader());
+        return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
     }
 
     public long getBuildPartitionSize(int pid) {
@@ -605,7 +598,7 @@
     }
 
     public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
-        return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader());
+        return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
     }
 
     public long getProbePartitionSize(int pid) {
@@ -659,10 +652,10 @@
 
         double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
         double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
-        String s =
-                "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t" + numOfSpilledPartitions
-                        + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t" + avgProbeSpSz
-                        + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t" + freeFramesCounter;
+        String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
+                + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
+                + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
+                + freeFramesCounter;
         return s;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 145ec3c..b9c2fb1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 
 public class MaterializerTaskState extends AbstractStateObject {
@@ -68,7 +67,7 @@
     }
 
     public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
-        RunFileReader in = out.createReader();
+        RunFileReader in = out.createDeleteOnCloseReader();
         writer.open();
         try {
             in.open();
@@ -83,7 +82,7 @@
             writer.close();
         }
     }
-    
+
     public void deleteFile() {
         out.getFileReference().delete();
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index cde11a0..b5ba099 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -66,7 +66,7 @@
         } finally {
             flushWriter.close();
         }
-        runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createReader(), maxFlushedFrameSize));
+        runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createDeleteOnCloseReader(), maxFlushedFrameSize));
         getSorter().reset();
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 9715b71..d44826e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -63,8 +63,8 @@
     public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
             int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
             RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
-        this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit,
-                Integer.MAX_VALUE, writer);
+        this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE,
+                writer);
     }
 
     public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
@@ -118,8 +118,7 @@
                 while (true) {
 
                     int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
-                            currentGenerationRunAvailable,
-                            stop);
+                            currentGenerationRunAvailable, stop);
                     prepareFrames(unUsed, inFrames, partialRuns);
 
                     if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
@@ -143,7 +142,7 @@
                             mergedMaxFrameSize = merge(mergeResultWriter, partialRuns);
                             mergeResultWriter.close();
 
-                            reader = mergeFileWriter.createReader();
+                            reader = mergeFileWriter.createDeleteOnCloseReader();
                         }
 
                         appendNewRuns(reader, mergedMaxFrameSize);
@@ -196,8 +195,7 @@
         return budget;
     }
 
-    private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames,
-            List<RunAndMaxFrameSizePair> patialRuns)
+    private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames, List<RunAndMaxFrameSizePair> patialRuns)
             throws HyracksDataException {
         if (extraFreeMem > 0 && patialRuns.size() > 1) {
             int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
@@ -246,14 +244,13 @@
         return sortFields;
     }
 
-    private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns)
-            throws HyracksDataException {
+    private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns) throws HyracksDataException {
         tempRuns.clear();
         for (int i = 0; i < partialRuns.size(); i++) {
             tempRuns.add(partialRuns.get(i).run);
         }
-        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(),
-                comparators, nmkComputer, recordDesc, topK);
+        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(), comparators,
+                nmkComputer, recordDesc, topK);
         int maxFrameSize = 0;
         int io = 0;
         merger.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 3cb5685..5f49dd5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -116,5 +116,11 @@
             <type>jar</type>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.e-movimento.tinytools</groupId>
+            <artifactId>privilegedaccessor</artifactId>
+            <version>1.2.2</version>
+            <scope>test</scope>
+        </dependency>
 	</dependencies>
 </project>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index f51e061..1f339152 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -28,8 +28,6 @@
 import java.util.Map;
 import java.util.Random;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -43,7 +41,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -53,11 +50,12 @@
 import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Test;
 
 public abstract class AbstractRunGeneratorTest {
     static TestUtils testUtils = new TestUtils();
-    static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+    static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE };
     static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
     static Random GRandom = new Random(System.currentTimeMillis());
     static int[] SortFields = new int[] { 0, 1 };
@@ -74,14 +72,14 @@
     abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException;
 
-    protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
-            int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+    protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns,
+            int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
         IHyracksTaskContext ctx = testUtils.create(pageSize);
 
         HashMap<Integer, String> keyValuePair = new HashMap<>();
         List<IFrame> frameList = new ArrayList<>();
-        prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize,
-                specialData, keyValuePair);
+        prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData,
+                keyValuePair);
         AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
         runGenerator.open();
         for (IFrame frame : frameList) {
@@ -94,12 +92,6 @@
 
     static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
             Map<Integer, String> keyValuePair) throws HyracksDataException {
-        IFrame frame = new VSizeFrame(ctx);
-        FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
-
-        HashMap<Integer, String> copyMap = new HashMap<>(keyValuePair);
-        assertReadSorted(runs, fta, frame, copyMap);
-
         HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
         int maxFrameSizes = 0;
         for (RunAndMaxFrameSizePair run : runs) {
@@ -163,8 +155,8 @@
                 tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
                 tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
 
-                VSizeFrame frame = new VSizeFrame(ctx, FrameHelper
-                        .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+                VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(
+                        tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
                 appender.reset(frame, true);
                 assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
                 frameList.add(frame);
@@ -186,9 +178,8 @@
                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                     frameList.add(frame);
                     datasize += frame.getFrameSize();
-                    frame = new VSizeFrame(ctx, FrameHelper
-                            .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(),
-                                    ctx.getInitialFrameSize()));
+                    frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length,
+                            tb.getSize(), ctx.getInitialFrameSize()));
                     appender.reset(frame, true);
                     assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
                 }
@@ -202,8 +193,7 @@
 
     }
 
-    static String generateRandomRecord(int minRecordSize, int maxRecordSize)
-            throws HyracksDataException {
+    static String generateRandomRecord(int minRecordSize, int maxRecordSize) throws HyracksDataException {
         int size = GRandom.nextInt(maxRecordSize - minRecordSize + 1) + minRecordSize;
         return generateRandomFixSizedString(size);
 
@@ -245,8 +235,8 @@
         int numRuns = 2;
         int minRecordSize = pageSize;
         int maxRecordSize = (int) (pageSize * 1.8);
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                null);
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+                maxRecordSize, null);
         assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
     }
 
@@ -258,8 +248,8 @@
         int minRecordSize = 20;
         int maxRecordSize = pageSize / 2;
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+                maxRecordSize, specialPair);
 
         int max = 0;
         for (RunAndMaxFrameSizePair run : size) {
@@ -276,8 +266,8 @@
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
         int minRecordSize = 10;
         int maxRecordSize = pageSize / 2;
-        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
+        List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+                maxRecordSize, specialPair);
 
     }
 }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index cb85755..ca0a6bb 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -19,14 +19,14 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.GRandom;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.generateRandomRecord;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.matchResult;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.prepareData;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.testUtils;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -40,7 +40,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.junit.Test;
+import junit.extensions.PA;
 
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrame;
@@ -55,17 +55,17 @@
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
 import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
 import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+import org.junit.Test;
 
 public class RunMergingFrameReaderTest {
-    static IBinaryComparator[] Comparators = new IBinaryComparator[] {
-            ComparatorFactories[0].createBinaryComparator(),
-            ComparatorFactories[1].createBinaryComparator(),
-    };
+    static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(),
+            ComparatorFactories[1].createBinaryComparator(), };
 
     static class TestFrameReader implements IFrameReader {
 
@@ -191,8 +191,8 @@
         List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
         List<TestFrameReader> readerList = new ArrayList<>(numRuns);
         List<IFrame> frameList = new ArrayList<>(numRuns);
-        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                frameList, keyValueMapList);
 
         RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
                 null, RecordDesc);
@@ -212,8 +212,8 @@
         List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
         List<TestFrameReader> readerList = new ArrayList<>(numRuns);
         List<IFrame> frameList = new ArrayList<>(numRuns);
-        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                frameList, keyValueMapList);
 
         RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
                 null, RecordDesc);
@@ -234,12 +234,11 @@
             List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
             List<TestFrameReader> readerList = new ArrayList<>(numRuns);
             List<IFrame> frameList = new ArrayList<>(numRuns);
-            prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                    minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+            prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize,
+                    readerList, frameList, keyValueMapList);
 
             RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
-                    Comparators,
-                    null, RecordDesc, topK);
+                    Comparators, null, RecordDesc, topK);
             int totoalCount = testMergeSucceedInner(ctx, reader, keyValueMapList);
             int newCount = 0;
             for (Map<Integer, String> x : keyValueMapList) {
@@ -285,25 +284,23 @@
         List<Map<Integer, String>> keyValueMap = new ArrayList<>();
         List<TestFrameReader> readerList = new ArrayList<>();
         List<IFrame> frameList = new ArrayList<>();
-        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                frameList, keyValueMap);
 
         minRecordSize = pageSize;
         maxRecordSize = pageSize;
         numFramesPerRun = 4;
-        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                frameList, keyValueMap);
 
         minRecordSize = pageSize * 2;
         maxRecordSize = pageSize * 2;
         numFramesPerRun = 6;
-        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
-                minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+        prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+                frameList, keyValueMap);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
-                Comparators,
-                null,
-                RecordDesc);
+        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+                null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMap);
     }
 
@@ -316,15 +313,14 @@
         int maxRecordSize = pageSize / 2;
 
         IHyracksTaskContext ctx = testUtils.create(pageSize);
-        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields,
-                null, ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT,
-                numFramesPerRun);
+        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null,
+                ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
 
         runGenerator.open();
         Map<Integer, String> keyValuePair = new HashMap<>();
         List<IFrame> frameList = new ArrayList<>();
-        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
-                null, keyValuePair);
+        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
+                keyValuePair);
         for (IFrame frame : frameList) {
             runGenerator.nextFrame(frame.getBuffer());
         }
@@ -333,18 +329,24 @@
         minRecordSize = pageSize;
         maxRecordSize = pageSize;
         frameList.clear();
-        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
-                null, keyValuePair);
+        prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
+                keyValuePair);
         for (IFrame frame : frameList) {
             runGenerator.nextFrame(frame.getBuffer());
         }
-
         runGenerator.close();
         List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
         for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
             inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
         }
+
+        // Let each run file reader not delete the run file when it is read and closed.
+        for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
+            RunFileReader runFileReader = (RunFileReader) run.run;
+            PA.setValue(runFileReader, "deleteAfterClose", false);
+        }
         matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+
         List<IFrameReader> runs = new ArrayList<>();
         for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
             runs.add(run.run);
@@ -399,10 +401,9 @@
         }
     }
 
-    static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns,
-            int numFramesPerRun, int minRecordSize, int maxRecordSize,
-            List<TestFrameReader> readerList, List<IFrame> frameList, List<Map<Integer, String>> keyValueMap)
-            throws HyracksDataException {
+    static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns, int numFramesPerRun,
+            int minRecordSize, int maxRecordSize, List<TestFrameReader> readerList, List<IFrame> frameList,
+            List<Map<Integer, String>> keyValueMap) throws HyracksDataException {
         for (int i = 0; i < numRuns; i++) {
             readerList.add(new TestFrameReader(pageSize, numFramesPerRun, minRecordSize, maxRecordSize));
             frameList.add(new VSizeFrame(ctx, readerList.get(readerList.size() - 1).maxFrameSize));