Optimized FrameSorter to make repeated invocations more efficient

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@507 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 3742e91..710ef38 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -35,14 +35,18 @@
     private final int[] sortFields;
     private final INormalizedKeyComputer nkc;
     private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
     private final List<ByteBuffer> buffers;
 
     private final FrameTupleAccessor fta1;
     private final FrameTupleAccessor fta2;
 
+    private final FrameTupleAppender appender;
+
+    private final ByteBuffer outFrame;
+
     private int dataFrameCount;
     private int[] tPointers;
+    private int tupleCount;
 
     public FrameSorter(IHyracksCommonContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
@@ -54,39 +58,24 @@
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        this.recordDescriptor = recordDescriptor;
         buffers = new ArrayList<ByteBuffer>();
         fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        outFrame = ctx.allocateFrame();
 
         dataFrameCount = 0;
     }
 
     public void reset() {
         dataFrameCount = 0;
-        tPointers = null;
+        tupleCount = 0;
     }
 
     public int getFrameCount() {
         return dataFrameCount;
     }
 
-    /**
-     * Gets the sorted tuple pointers.
-     * A tuple is "pointed" to by 4 entries in the tPointers array.
-     * [0] = Frame index in the "Frames" list.
-     * [1] = Start offset of the tuple in the frame
-     * [2] = Length of the tuple
-     * [3] = Poor man's normalized key for the tuple.
-     */
-    public int[] getTPointers() {
-        return tPointers;
-    }
-
-    public List<ByteBuffer> getFrames() {
-        return buffers;
-    }
-
     public void insertFrame(ByteBuffer buffer) {
         ByteBuffer copyFrame;
         if (dataFrameCount == buffers.size()) {
@@ -100,55 +89,50 @@
     }
 
     public void sortFrames() {
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         int nBuffers = dataFrameCount;
-        int totalTCount = 0;
+        tupleCount = 0;
         for (int i = 0; i < nBuffers; ++i) {
-            accessor.reset(buffers.get(i));
-            totalTCount += accessor.getTupleCount();
+            fta1.reset(buffers.get(i));
+            tupleCount += fta1.getTupleCount();
         }
         int sfIdx = sortFields[0];
-        tPointers = new int[totalTCount * 4];
+        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
         int ptr = 0;
         for (int i = 0; i < nBuffers; ++i) {
-            accessor.reset(buffers.get(i));
-            int tCount = accessor.getTupleCount();
-            byte[] array = accessor.getBuffer().array();
+            fta1.reset(buffers.get(i));
+            int tCount = fta1.getTupleCount();
+            byte[] array = fta1.getBuffer().array();
             for (int j = 0; j < tCount; ++j) {
-                int tStart = accessor.getTupleStartOffset(j);
-                int tEnd = accessor.getTupleEndOffset(j);
+                int tStart = fta1.getTupleStartOffset(j);
+                int tEnd = fta1.getTupleEndOffset(j);
                 tPointers[ptr * 4] = i;
                 tPointers[ptr * 4 + 1] = tStart;
                 tPointers[ptr * 4 + 2] = tEnd;
-                int f0StartRel = accessor.getFieldStartOffset(j, sfIdx);
-                int f0EndRel = accessor.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + accessor.getFieldSlotsLength();
+                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
+                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
+                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
                 tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
                 ++ptr;
             }
         }
-        if (tPointers.length > 0) {
-            sort(tPointers, 0, totalTCount);
+        if (tupleCount > 0) {
+            sort(tPointers, 0, tupleCount);
         }
     }
 
     public void flushFrames(IFrameWriter writer) throws HyracksDataException {
-        FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        ByteBuffer outFrame = ctx.allocateFrame();
         writer.open();
         appender.reset(outFrame, true);
-        int n = tPointers.length / 4;
-        for (int ptr = 0; ptr < n; ++ptr) {
+        for (int ptr = 0; ptr < tupleCount; ++ptr) {
             int i = tPointers[ptr * 4];
             int tStart = tPointers[ptr * 4 + 1];
             int tEnd = tPointers[ptr * 4 + 2];
             ByteBuffer buffer = buffers.get(i);
-            accessor.reset(buffer);
-            if (!appender.append(accessor, tStart, tEnd)) {
+            fta1.reset(buffer);
+            if (!appender.append(fta1, tStart, tEnd)) {
                 FrameUtils.flushFrame(outFrame, writer);
                 appender.reset(outFrame, true);
-                if (!appender.append(accessor, tStart, tEnd)) {
+                if (!appender.append(fta1, tStart, tEnd)) {
                     throw new IllegalStateException();
                 }
             }
@@ -255,4 +239,4 @@
         }
         return 0;
     }
-}
+}
\ No newline at end of file