Fixed file handle leaks

git-svn-id: https://hyracks.googlecode.com/svn/trunk@141 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 1012098..4c0e715 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -121,7 +121,7 @@
                         try {
                             createRunFromInFrames(inFrames.size());
                         } catch (Exception e) {
-                            e.printStackTrace();
+                            throw new HyracksDataException(e);
                         }
                     }
                     ByteBuffer copy;
@@ -324,7 +324,7 @@
                                 try {
                                     doPass(runs, passCount);
                                 } catch (Exception e) {
-                                    e.printStackTrace();
+                                    throw new HyracksDataException(e);
                                 }
                             }
                         }
@@ -336,7 +336,7 @@
                 }
 
                 // creates a new run from runs that can fit in memory.
-                private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
+                private void doPass(LinkedList<File> runs, int passCount) throws HyracksDataException, IOException {
                     File newRun = null;
                     IFrameWriter writer = this.writer;
                     boolean finalPass = false;
@@ -349,51 +349,58 @@
                         newRun = ctx.getResourceManager().createFile(
                                 ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
                         writer = new RunFileWriter(newRun);
+                        writer.open();
                     }
-                    RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
-                    FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-                    Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                    ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
-                            inFrames.size(), comparator);
-                    int[] tupleIndexes = new int[inFrames.size()];
-                    for (int i = 0; i < inFrames.size(); i++) {
-                        tupleIndexes[i] = 0;
-                        int runIndex = topTuples.peek().getRunid();
-                        runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
-                        runCursors[runIndex].open();
-                        if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
-                            tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                            tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                        } else {
-                            closeRun(runIndex, runCursors, tupleAccessors);
-                        }
-                    }
-
-                    while (!topTuples.areRunsExhausted()) {
-                        ReferenceEntry top = topTuples.peek();
-                        int runIndex = top.getRunid();
-                        FrameTupleAccessor fta = top.getAccessor();
-                        int tupleIndex = top.getTupleIndex();
-
-                        if (!outFrameAppender.append(fta, tupleIndex)) {
-                            flushFrame(outFrame, writer);
-                            outFrameAppender.reset(outFrame, true);
-                            if (!outFrameAppender.append(fta, tupleIndex)) {
-                                throw new IllegalStateException();
+                    try {
+                        RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
+                                inFrames.size(), comparator);
+                        int[] tupleIndexes = new int[inFrames.size()];
+                        for (int i = 0; i < inFrames.size(); i++) {
+                            tupleIndexes[i] = 0;
+                            int runIndex = topTuples.peek().getRunid();
+                            runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
+                            runCursors[runIndex].open();
+                            if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+                                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
+                                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+                                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+                            } else {
+                                closeRun(runIndex, runCursors, tupleAccessors);
                             }
                         }
 
-                        ++tupleIndexes[runIndex];
-                        setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                    }
-                    if (outFrameAppender.getTupleCount() > 0) {
-                        flushFrame(outFrame, writer);
-                        outFrameAppender.reset(outFrame, true);
-                    }
-                    runs.subList(0, inFrames.size()).clear();
-                    if (!finalPass) {
-                        runs.add(0, newRun);
+                        while (!topTuples.areRunsExhausted()) {
+                            ReferenceEntry top = topTuples.peek();
+                            int runIndex = top.getRunid();
+                            FrameTupleAccessor fta = top.getAccessor();
+                            int tupleIndex = top.getTupleIndex();
+
+                            if (!outFrameAppender.append(fta, tupleIndex)) {
+                                flushFrame(outFrame, writer);
+                                outFrameAppender.reset(outFrame, true);
+                                if (!outFrameAppender.append(fta, tupleIndex)) {
+                                    throw new IllegalStateException();
+                                }
+                            }
+
+                            ++tupleIndexes[runIndex];
+                            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+                        }
+                        if (outFrameAppender.getTupleCount() > 0) {
+                            flushFrame(outFrame, writer);
+                            outFrameAppender.reset(outFrame, true);
+                        }
+                        runs.subList(0, inFrames.size()).clear();
+                        if (!finalPass) {
+                            runs.add(0, newRun);
+                        }
+                    } finally {
+                        if (!finalPass) {
+                            writer.close();
+                        }
                     }
                 }
 
@@ -425,7 +432,9 @@
                     }
                 }
 
-                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor) {
+                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+                        throws HyracksDataException {
+                    runCursors[index].close();
                     runCursors[index] = null;
                     tupleAccessor[index] = null;
                 }
@@ -535,10 +544,12 @@
 
         @Override
         public void close() throws HyracksDataException {
-            try {
-                channel.close();
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
+            if (channel != null) {
+                try {
+                    channel.close();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
             }
         }