Adding Optimization for the merge step in external sort
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@965 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 98621f5..59cf056 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -53,7 +53,7 @@
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
- private final FrameSorter frameSorter; //Used in External sort, no replacement selection
+ private FrameSorter frameSorter; //Used in External sort, no replacement selection
private FrameTupleAccessor outFrameAccessor; //Used in External sort, with replacement selection
private final int outputLimit; //Used in External sort, with replacement selection and limit on output size
private int currentSize; //Used in External sort, with replacement selection and limit on output size
@@ -95,7 +95,15 @@
if (frameSorter != null && frameSorter.getFrameCount() > 0) {
frameSorter.flushFrames(writer);
}
+ /** recycle sort buffer */
+ frameSorter = null;
+ System.gc();
+
} else {
+ /** recycle sort buffer */
+ frameSorter = null;
+ System.gc();
+
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
@@ -162,15 +170,12 @@
public void processWithReplacementSelection() throws HyracksDataException {
writer.open();
-
try {
outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
outFrame = ctx.allocateFrame();
outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
outFrameAppender.reset(outFrame, true);
-
if (runs.size() == 1) {
-
if (outputLimit < 1) {
runs.get(0).open();
ByteBuffer nextFrame = ctx.allocateFrame();
@@ -178,9 +183,10 @@
FrameUtils.flushFrame(nextFrame, writer);
outFrameAppender.reset(nextFrame, true);
}
+ System.gc();
return;
}
-
+ //Limit on the output size
int totalCount = 0;
runs.get(0).open();
FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
@@ -203,16 +209,15 @@
totalCount++;
}
}
-
if (outFrameAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
outFrameAppender.reset(outFrame, true);
}
-
+ System.gc();
return;
}
-
// More than one run, actual merging is needed
+ System.gc();
inFrames = new ArrayList<ByteBuffer>();
for (int i = 0; i < framesLimit - 1; ++i) {
inFrames.add(ctx.allocateFrame());
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index e283c33..6b73c54 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -187,9 +187,10 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
-
+
+ int necessaryFrames = Math.min(runs.size() + 2, memSize);
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
- comparators, recordDescriptors[0], memSize, writer);
+ comparators, recordDescriptors[0], necessaryFrames, writer);
merger.processWithReplacementSelection();