Fixed file handle leaks
git-svn-id: https://hyracks.googlecode.com/svn/trunk@141 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 1012098..4c0e715 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -121,7 +121,7 @@
try {
createRunFromInFrames(inFrames.size());
} catch (Exception e) {
- e.printStackTrace();
+ throw new HyracksDataException(e);
}
}
ByteBuffer copy;
@@ -324,7 +324,7 @@
try {
doPass(runs, passCount);
} catch (Exception e) {
- e.printStackTrace();
+ throw new HyracksDataException(e);
}
}
}
@@ -336,7 +336,7 @@
}
// creates a new run from runs that can fit in memory.
- private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
+ private void doPass(LinkedList<File> runs, int passCount) throws HyracksDataException, IOException {
File newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
@@ -349,51 +349,58 @@
newRun = ctx.getResourceManager().createFile(
ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
writer = new RunFileWriter(newRun);
+ writer.open();
}
- RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
- inFrames.size(), comparator);
- int[] tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
- runCursors[runIndex].open();
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- } else {
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- while (!topTuples.areRunsExhausted()) {
- ReferenceEntry top = topTuples.peek();
- int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- int tupleIndex = top.getTupleIndex();
-
- if (!outFrameAppender.append(fta, tupleIndex)) {
- flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!outFrameAppender.append(fta, tupleIndex)) {
- throw new IllegalStateException();
+ try {
+ RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
+ inFrames.size(), comparator);
+ int[] tupleIndexes = new int[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ tupleIndexes[i] = 0;
+ int runIndex = topTuples.peek().getRunid();
+ runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
+ runCursors[runIndex].open();
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ } else {
+ closeRun(runIndex, runCursors, tupleAccessors);
}
}
- ++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- }
- if (outFrameAppender.getTupleCount() > 0) {
- flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, newRun);
+ while (!topTuples.areRunsExhausted()) {
+ ReferenceEntry top = topTuples.peek();
+ int runIndex = top.getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ int tupleIndex = top.getTupleIndex();
+
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ ++tupleIndexes[runIndex];
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ }
+ if (outFrameAppender.getTupleCount() > 0) {
+ flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, newRun);
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
}
}
@@ -425,7 +432,9 @@
}
}
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor) {
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ runCursors[index].close();
runCursors[index] = null;
tupleAccessor[index] = null;
}
@@ -535,10 +544,12 @@
@Override
public void close() throws HyracksDataException {
- try {
- channel.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
}