Let a run file reader have an option to delete the run file after it is read.
Change-Id: Iabbd7c3e00489e2dbd8b1d4b87c7c9f81e8116b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/397
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <jianfeng.jia@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
index 5b1bc02..e930dab 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/WorkspaceFileFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.nc.io;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
@@ -39,7 +40,8 @@
registry.registerDeallocatable(new IDeallocatable() {
@Override
public void deallocate() {
- fRef.delete();
+ // Delete the created managed file.
+ FileUtils.deleteQuietly(fRef.getFile());
}
});
return fRef;
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index 27760c1..71e1be9 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -63,11 +63,15 @@
<artifactId>hyracks-data-std</artifactId>
<version>0.2.17-SNAPSHOT</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
<version>0.2.17-SNAPSHOT</version>
<scope>test</scope>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index f5d4444..e1a2a89 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.dataflow.common.io;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameReader;
@@ -28,16 +29,17 @@
public class RunFileReader implements IFrameReader {
private final FileReference file;
+ private IFileHandle handle;
private final IIOManager ioManager;
private final long size;
-
- private IFileHandle handle;
private long readPtr;
+ private boolean deleteAfterClose;
- public RunFileReader(FileReference file, IIOManager ioManager, long size) {
+ public RunFileReader(FileReference file, IIOManager ioManager, long size, boolean deleteAfterRead) {
this.file = file;
this.ioManager = ioManager;
this.size = size;
+ this.deleteAfterClose = deleteAfterRead;
}
@Override
@@ -77,6 +79,9 @@
@Override
public void close() throws HyracksDataException {
ioManager.close(handle);
+ if (deleteAfterClose) {
+ FileUtils.deleteQuietly(file.getFile());
+ }
}
public long getFileSize() {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 755e336..2af40fd 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -77,6 +77,13 @@
if (failed) {
throw new HyracksDataException("createReader() called on a failed RunFileWriter");
}
- return new RunFileReader(file, ioManager, size);
+ return new RunFileReader(file, ioManager, size, false);
+ }
+
+ public RunFileReader createDeleteOnCloseReader() throws HyracksDataException {
+ if (failed) {
+ throw new HyracksDataException("createReader() called on a failed RunFileWriter");
+ }
+ return new RunFileReader(file, ioManager, size, true);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
index 568a62d..fb81912 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/ShuffleFrameReader.java
@@ -25,7 +25,6 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameReader;
@@ -124,14 +123,15 @@
}
List<RunAndMaxFrameSizePair> runs = new LinkedList<>();
for (int i = 0; i < runFileWriters.size(); i++) {
- runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createReader(), runFileMaxFrameSize.get(i)));
+ runs.add(new RunAndMaxFrameSizePair(runFileWriters.get(i).createDeleteOnCloseReader(), runFileMaxFrameSize
+ .get(i)));
}
RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 },
- comparators, null, recordDescriptor, framesLimit, rfw);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators, null,
+ recordDescriptor, framesLimit, rfw);
merger.process();
- reader = rfw.createReader();
+ reader = rfw.createDeleteOnCloseReader();
reader.open();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index f3ee778..4311809 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -137,6 +137,6 @@
writer.close();
}
gTable.reset();
- state.getRuns().add(((RunFileWriter) writer).createReader());
+ state.getRuns().add(writer.createDeleteOnCloseReader());
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index fb56c0e..814e537 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -118,6 +118,7 @@
this.outRecordDescriptor = outRecordDescriptor;
}
+ @Override
public void initialize() throws HyracksDataException {
aggState = (ExternalGroupState) ctx.getStateObject(stateId);
runs = aggState.getRuns();
@@ -281,7 +282,7 @@
* file list
*/
if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
+ runs.add(0, ((RunFileWriter) writer).createDeleteOnCloseReader());
}
} finally {
if (!finalPass) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index d50d307..349cc5a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -127,7 +127,7 @@
// build
if (buildWriter != null) {
- RunFileReader buildReader = buildWriter.createReader();
+ RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
buildReader.open();
while (buildReader.nextFrame(buffer)) {
ByteBuffer copyBuffer = ctx.allocateFrame(buffer.getFrameSize());
@@ -139,7 +139,7 @@
}
// probe
- RunFileReader probeReader = probeWriter.createReader();
+ RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
probeReader.open();
while (probeReader.nextFrame(buffer)) {
joiner.join(buffer.getBuffer(), writer);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index e115937..f72d528 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -80,8 +80,10 @@
/**
* @param spec
- * @param memsize in frames
- * @param inputsize0 in frames
+ * @param memsize
+ * in frames
+ * @param inputsize0
+ * in frames
* @param recordsPerFrame
* @param factor
* @param keys0
@@ -300,8 +302,8 @@
if (memsize > inputsize0) {
state.nPartitions = 0;
} else {
- state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
- / (double) (memsize - 1)));
+ state.nPartitions = (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize)
+ / (memsize - 1)));
}
if (state.nPartitions <= 0) {
// becomes in-memory HJ
@@ -324,10 +326,9 @@
.createPartitioner();
int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
ISerializableTable table = new SerializableHashTable(tableSize, ctx);
- state.joiner = new InMemoryHashJoin(ctx, tableSize,
- new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
- predEvaluator);
+ state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
+ new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
+ isLeftOuter, nullWriters1, table, predEvaluator);
bufferForPartitions = new IFrame[state.nPartitions];
state.fWriters = new RunFileWriter[state.nPartitions];
for (int i = 0; i < state.nPartitions; i++) {
@@ -432,7 +433,7 @@
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
- int entry ;
+ int entry;
if (state.memoryForHashtable == 0) {
entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
boolean newBuffer = false;
@@ -513,13 +514,12 @@
continue;
}
table.reset();
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
- rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1,
- table, predEvaluator);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0),
+ hpcRep0, new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0,
+ keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
if (buildWriter != null) {
- RunFileReader buildReader = buildWriter.createReader();
+ RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
buildReader.open();
while (buildReader.nextFrame(inBuffer)) {
ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
@@ -531,7 +531,7 @@
}
// probe
- RunFileReader probeReader = probeWriter.createReader();
+ RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
probeReader.open();
while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer.getBuffer(), writer);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index db7c308..5ca7700 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -55,12 +55,11 @@
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuilder;
private final IPredicateEvaluator predEvaluator;
- private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+ private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
- INullWriter[] nullWriters1)
- throws HyracksDataException {
+ INullWriter[] nullWriters1) throws HyracksDataException {
this.accessorInner = accessor1;
this.accessorOuter = accessor0;
this.appender = new FrameTupleAppender();
@@ -166,7 +165,7 @@
}
private boolean evaluatePredicate(int tIx1, int tIx2) {
- if (isReversed) { //Role Reversal Optimization is triggered
+ if (isReversed) { //Role Reversal Optimization is triggered
return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
} else {
return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
@@ -194,7 +193,7 @@
}
public void closeJoin(IFrameWriter writer) throws HyracksDataException {
- runFileReader = runFileWriter.createReader();
+ runFileReader = runFileWriter.createDeleteOnCloseReader();
runFileReader.open();
while (runFileReader.nextFrame(innerBuffer)) {
for (int i = 0; i < currentMemSize; i++) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index e332ecf..58e1b29 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -185,17 +185,14 @@
buildPSizeInFrames = new int[numOfPartitions];
freeFramesCounter = memForJoin - numOfPartitions;
- for (int i = 0; i
- < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+ for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
memBuffs[i] = new VSizeFrame(ctx);
curPBuff[i] = i;
nextBuff[i] = -1;
buildPSizeInFrames[i] = 1; //The dedicated initial buffer
}
- nextFreeBuffIx = ((numOfPartitions < memForJoin) ?
- numOfPartitions :
- NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+ nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
for (int i = numOfPartitions; i < memBuffs.length; i++) {
nextBuff[i] = UNALLOCATED_FRAME;
}
@@ -235,8 +232,7 @@
if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
int pidToSpill = selectPartitionToSpill();
if (pidToSpill == -1) { //No more partition to spill
- throw new HyracksDataException(
- "not enough memory for Hash Join (Allocation exceeds the limit)");
+ throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
}
spillPartition(pidToSpill);
buildTupAppender.reset(memBuffs[pidToSpill], true);
@@ -258,7 +254,7 @@
if (buildTupAppender.append(accessorBuild, tid)) {
break;
}
- //Dedicated in-memory buffer for the partition is full, needed to be flushed first
+ //Dedicated in-memory buffer for the partition is full, needed to be flushed first
buildWrite(pid, partition.getBuffer());
partition.reset();
needClear = true;
@@ -355,8 +351,7 @@
}
ByteBuffer buff = null;
- for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus
- .nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+ for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
buff = memBuffs[i].getBuffer();
accessorBuild.reset(buff);
if (accessorBuild.getTupleCount() > 0) {
@@ -427,7 +422,7 @@
}
private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
- RunFileReader r = wr.createReader();
+ RunFileReader r = wr.createDeleteOnCloseReader();
r.open();
int counter = 0;
ByteBuffer mBuff = null;
@@ -469,9 +464,8 @@
private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildHpc,
- new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters1, table,
- predEvaluator, isReversed);
+ new FrameTupleAccessor(buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
}
private void cacheInMemJoin() throws HyracksDataException {
@@ -495,8 +489,8 @@
}
curPBuff = new int[numOfPartitions];
int nextBuffIxToAlloc = 0;
- /* We only need to allocate one frame per spilled partition.
- * Resident partitions do not need frames in probe, as their tuples join
+ /* We only need to allocate one frame per spilled partition.
+ * Resident partitions do not need frames in probe, as their tuples join
* immediately with the resident build tuples using the inMemoryHashJoin */
for (int i = 0; i < numOfPartitions; i++) {
curPBuff[i] = (pStatus.get(i)) ? nextBuffIxToAlloc++ : BUFFER_FOR_RESIDENT_PARTS;
@@ -559,8 +553,7 @@
}
- public void closeProbe(IFrameWriter writer) throws
- HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
inMemJoiner.join(probeResBuff.getBuffer(), writer);
inMemJoiner.closeJoin(writer);
@@ -593,7 +586,7 @@
}
public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
- return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader());
+ return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
}
public long getBuildPartitionSize(int pid) {
@@ -605,7 +598,7 @@
}
public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
- return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader());
+ return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
}
public long getProbePartitionSize(int pid) {
@@ -659,10 +652,10 @@
double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
- String s =
- "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t" + numOfSpilledPartitions
- + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t" + avgProbeSpSz
- + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t" + freeFramesCounter;
+ String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
+ + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
+ + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
+ + freeFramesCounter;
return s;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 145ec3c..b9c2fb1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
public class MaterializerTaskState extends AbstractStateObject {
@@ -68,7 +67,7 @@
}
public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
- RunFileReader in = out.createReader();
+ RunFileReader in = out.createDeleteOnCloseReader();
writer.open();
try {
in.open();
@@ -83,7 +82,7 @@
writer.close();
}
}
-
+
public void deleteFile() {
out.getFileReference().delete();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index cde11a0..b5ba099 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -66,7 +66,7 @@
} finally {
flushWriter.close();
}
- runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createReader(), maxFlushedFrameSize));
+ runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createDeleteOnCloseReader(), maxFlushedFrameSize));
getSorter().reset();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 9715b71..d44826e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -63,8 +63,8 @@
public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
- this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit,
- Integer.MAX_VALUE, writer);
+ this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE,
+ writer);
}
public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
@@ -118,8 +118,7 @@
while (true) {
int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
- currentGenerationRunAvailable,
- stop);
+ currentGenerationRunAvailable, stop);
prepareFrames(unUsed, inFrames, partialRuns);
if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
@@ -143,7 +142,7 @@
mergedMaxFrameSize = merge(mergeResultWriter, partialRuns);
mergeResultWriter.close();
- reader = mergeFileWriter.createReader();
+ reader = mergeFileWriter.createDeleteOnCloseReader();
}
appendNewRuns(reader, mergedMaxFrameSize);
@@ -196,8 +195,7 @@
return budget;
}
- private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames,
- List<RunAndMaxFrameSizePair> patialRuns)
+ private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames, List<RunAndMaxFrameSizePair> patialRuns)
throws HyracksDataException {
if (extraFreeMem > 0 && patialRuns.size() > 1) {
int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
@@ -246,14 +244,13 @@
return sortFields;
}
- private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns)
- throws HyracksDataException {
+ private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns) throws HyracksDataException {
tempRuns.clear();
for (int i = 0; i < partialRuns.size(); i++) {
tempRuns.add(partialRuns.get(i).run);
}
- RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(),
- comparators, nmkComputer, recordDesc, topK);
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(), comparators,
+ nmkComputer, recordDesc, topK);
int maxFrameSize = 0;
int io = 0;
merger.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 3cb5685..5f49dd5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -116,5 +116,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.e-movimento.tinytools</groupId>
+ <artifactId>privilegedaccessor</artifactId>
+ <version>1.2.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index f51e061..1f339152 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -28,8 +28,6 @@
import java.util.Map;
import java.util.Random;
-import org.junit.Test;
-
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -43,7 +41,6 @@
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -53,11 +50,12 @@
import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Test;
public abstract class AbstractRunGeneratorTest {
static TestUtils testUtils = new TestUtils();
- static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE };
+ static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
static Random GRandom = new Random(System.currentTimeMillis());
static int[] SortFields = new int[] { 0, 1 };
@@ -74,14 +72,14 @@
abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
throws HyracksDataException;
- protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
- int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+ protected List<RunAndMaxFrameSizePair> testSortRecords(int pageSize, int frameLimit, int numRuns,
+ int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
IHyracksTaskContext ctx = testUtils.create(pageSize);
HashMap<Integer, String> keyValuePair = new HashMap<>();
List<IFrame> frameList = new ArrayList<>();
- prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize,
- specialData, keyValuePair);
+ prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData,
+ keyValuePair);
AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
runGenerator.open();
for (IFrame frame : frameList) {
@@ -94,12 +92,6 @@
static void matchResult(IHyracksTaskContext ctx, List<RunAndMaxFrameSizePair> runs,
Map<Integer, String> keyValuePair) throws HyracksDataException {
- IFrame frame = new VSizeFrame(ctx);
- FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
-
- HashMap<Integer, String> copyMap = new HashMap<>(keyValuePair);
- assertReadSorted(runs, fta, frame, copyMap);
-
HashMap<Integer, String> copyMap2 = new HashMap<>(keyValuePair);
int maxFrameSizes = 0;
for (RunAndMaxFrameSizePair run : runs) {
@@ -163,8 +155,8 @@
tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
tb.addField(UTF8StringSerializerDeserializer.INSTANCE, entry.getValue());
- VSizeFrame frame = new VSizeFrame(ctx, FrameHelper
- .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+ VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(
+ tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
appender.reset(frame, true);
assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
frameList.add(frame);
@@ -186,9 +178,8 @@
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
frameList.add(frame);
datasize += frame.getFrameSize();
- frame = new VSizeFrame(ctx, FrameHelper
- .calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length, tb.getSize(),
- ctx.getInitialFrameSize()));
+ frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length,
+ tb.getSize(), ctx.getInitialFrameSize()));
appender.reset(frame, true);
assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
}
@@ -202,8 +193,7 @@
}
- static String generateRandomRecord(int minRecordSize, int maxRecordSize)
- throws HyracksDataException {
+ static String generateRandomRecord(int minRecordSize, int maxRecordSize) throws HyracksDataException {
int size = GRandom.nextInt(maxRecordSize - minRecordSize + 1) + minRecordSize;
return generateRandomFixSizedString(size);
@@ -245,8 +235,8 @@
int numRuns = 2;
int minRecordSize = pageSize;
int maxRecordSize = (int) (pageSize * 1.8);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- null);
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ maxRecordSize, null);
assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
}
@@ -258,8 +248,8 @@
int minRecordSize = 20;
int maxRecordSize = pageSize / 2;
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- specialPair);
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ maxRecordSize, specialPair);
int max = 0;
for (RunAndMaxFrameSizePair run : size) {
@@ -276,8 +266,8 @@
HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
int minRecordSize = 10;
int maxRecordSize = pageSize / 2;
- List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
- specialPair);
+ List<RunAndMaxFrameSizePair> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
+ maxRecordSize, specialPair);
}
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index cb85755..ca0a6bb 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -19,14 +19,14 @@
package org.apache.hyracks.tests.unit;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.GRandom;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.generateRandomRecord;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.matchResult;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.prepareData;
-import static org.apache.hyracks.tests.unit.ExternalSortRunGeneratorTest.testUtils;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -40,7 +40,7 @@
import java.util.Map;
import java.util.TreeMap;
-import org.junit.Test;
+import junit.extensions.PA;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
@@ -55,17 +55,17 @@
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.RunAndMaxFrameSizePair;
import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
+import org.junit.Test;
public class RunMergingFrameReaderTest {
- static IBinaryComparator[] Comparators = new IBinaryComparator[] {
- ComparatorFactories[0].createBinaryComparator(),
- ComparatorFactories[1].createBinaryComparator(),
- };
+ static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(),
+ ComparatorFactories[1].createBinaryComparator(), };
static class TestFrameReader implements IFrameReader {
@@ -191,8 +191,8 @@
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
null, RecordDesc);
@@ -212,8 +212,8 @@
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
null, RecordDesc);
@@ -234,12 +234,11 @@
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMapList);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize,
+ readerList, frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
- Comparators,
- null, RecordDesc, topK);
+ Comparators, null, RecordDesc, topK);
int totoalCount = testMergeSucceedInner(ctx, reader, keyValueMapList);
int newCount = 0;
for (Map<Integer, String> x : keyValueMapList) {
@@ -285,25 +284,23 @@
List<Map<Integer, String>> keyValueMap = new ArrayList<>();
List<TestFrameReader> readerList = new ArrayList<>();
List<IFrame> frameList = new ArrayList<>();
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMap);
minRecordSize = pageSize;
maxRecordSize = pageSize;
numFramesPerRun = 4;
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMap);
minRecordSize = pageSize * 2;
maxRecordSize = pageSize * 2;
numFramesPerRun = 6;
- prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun,
- minRecordSize, maxRecordSize, readerList, frameList, keyValueMap);
+ prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
+ frameList, keyValueMap);
- RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
- Comparators,
- null,
- RecordDesc);
+ RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
+ null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMap);
}
@@ -316,15 +313,14 @@
int maxRecordSize = pageSize / 2;
IHyracksTaskContext ctx = testUtils.create(pageSize);
- ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields,
- null, ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT,
- numFramesPerRun);
+ ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null,
+ ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
runGenerator.open();
Map<Integer, String> keyValuePair = new HashMap<>();
List<IFrame> frameList = new ArrayList<>();
- prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
- null, keyValuePair);
+ prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
+ keyValuePair);
for (IFrame frame : frameList) {
runGenerator.nextFrame(frame.getBuffer());
}
@@ -333,18 +329,24 @@
minRecordSize = pageSize;
maxRecordSize = pageSize;
frameList.clear();
- prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize,
- null, keyValuePair);
+ prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
+ keyValuePair);
for (IFrame frame : frameList) {
runGenerator.nextFrame(frame.getBuffer());
}
-
runGenerator.close();
List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
for (RunAndMaxFrameSizePair max : runGenerator.getRuns()) {
inFrame.add(new GroupVSizeFrame(ctx, max.maxFrameSize));
}
+
+ // Let each run file reader not delete the run file when it is read and closed.
+ for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
+ RunFileReader runFileReader = (RunFileReader) run.run;
+ PA.setValue(runFileReader, "deleteAfterClose", false);
+ }
matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+
List<IFrameReader> runs = new ArrayList<>();
for (RunAndMaxFrameSizePair run : runGenerator.getRuns()) {
runs.add(run.run);
@@ -399,10 +401,9 @@
}
}
- static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns,
- int numFramesPerRun, int minRecordSize, int maxRecordSize,
- List<TestFrameReader> readerList, List<IFrame> frameList, List<Map<Integer, String>> keyValueMap)
- throws HyracksDataException {
+ static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns, int numFramesPerRun,
+ int minRecordSize, int maxRecordSize, List<TestFrameReader> readerList, List<IFrame> frameList,
+ List<Map<Integer, String>> keyValueMap) throws HyracksDataException {
for (int i = 0; i < numRuns; i++) {
readerList.add(new TestFrameReader(pageSize, numFramesPerRun, minRecordSize, maxRecordSize));
frameList.add(new VSizeFrame(ctx, readerList.get(readerList.size() - 1).maxFrameSize));