merge with fullstack_hybridhashgby r2861

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2864 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 974ef3b..f2b56fa 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -255,7 +255,8 @@
                 }
 
                 outputAppender.reset(outputFrame, true);
-                //writer.open();
+
+                writer.open();
 
                 if (tPointers == null) {
                     // Not sorted
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
index 57a364b..6e85cff 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortGroupHashTable.java
@@ -279,7 +279,6 @@
      */
     public void flushHashtableToOutput(IFrameWriter outputWriter) throws HyracksDataException {
 
-        // FIXME: remove this 
         outputAppender.reset(outputBuffer, true);
         for (int i = 0; i < contents.length; i++) {
             if (contents[i] == null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
index 8695e0b..a846e4a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hashsort/HybridHashSortRunMerger.java
@@ -66,8 +66,7 @@
     }
 
     public void process() throws HyracksDataException {
-
-        writer.open();
+        
         // FIXME
         int mergeLevels = 0, mergeRunCount = 0;
         try {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
index d147a53..b325b83 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupHashTable.java
@@ -263,7 +263,7 @@
 
     private void insert(FrameTupleAccessor accessor, int tupleIndex) throws HyracksDataException {
 
-    	if (isPartitionOnly) {
+        if (isPartitionOnly) {
             // for partition only
             int pid = partitionComputer.partition(accessor, tupleIndex, tableSize) % numOfPartitions;
             insertSpilledPartition(accessor, tupleIndex, pid);
@@ -582,7 +582,7 @@
         }
     }
 
-    public List<Integer> getSpilledRunsSizeInTuples() throws HyracksDataException {
+    public List<Integer> getSpilledRunsSizeInRawTuples() throws HyracksDataException {
         return spilledPartRunSizesInTuples;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
index dcefc59..0cc718d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashGroupOperatorDescriptor.java
@@ -17,6 +17,7 @@
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -97,10 +98,6 @@
     private final boolean doInputAdjustment;
 
     private final static double FUDGE_FACTOR_ESTIMATION = 1.2;
-    
-    
-      
-    private int open_times ; 
 
     public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
             long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int tableSize,
@@ -111,7 +108,6 @@
         this(spec, keyFields, framesLimit, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
                 comparatorFactories, hashFamilies, hashFuncStartLevel, firstNormalizerFactory, aggregatorFactory,
                 mergerFactory, outRecDesc, true);
-        open_times = 0;
     }
 
     public HybridHashGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
@@ -153,8 +149,6 @@
         recordDescriptors[0] = outRecDesc;
 
         this.doInputAdjustment = doInputAdjustment;
-        
-        open_times = 0;
     }
 
     @Override
@@ -177,7 +171,8 @@
 
             HybridHashGroupHashTable topProcessor;
 
-            int observedInputSizeInFrames;
+            int observedInputSizeInRawTuples;
+            int observedInputSizeInFrames, maxRecursiveLevels;
 
             int userProvidedInputSizeInFrames;
 
@@ -240,13 +235,11 @@
 
                 writer.open();
                 topProcessor.open();
-                
-                open_times += 1;
-                System.err.println(open_times);
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                observedInputSizeInRawTuples += buffer.getInt(buffer.capacity() - 4);
                 observedInputSizeInFrames++;
                 topProcessor.nextFrame(buffer);
             }
@@ -259,65 +252,24 @@
 
             @Override
             public void close() throws HyracksDataException {
-                topProcessor.finishup();
+                // estimate the maximum recursive levels
+                maxRecursiveLevels = (int) Math.max(
+                        Math.ceil(Math.log(observedInputSizeInFrames * fudgeFactor) / Math.log(framesLimit)) + 1, 1);
 
-                List<IFrameReader> runs = topProcessor.getSpilledRuns();
-                List<Integer> runsSizeInFrames = topProcessor.getSpilledRunsSizeInPages();
-
-                // get statistics from the hash table
-                int hashedKeys = topProcessor.getHashedUniqueKeys();
-                int hashedRawRecords = topProcessor.getHashedRawRecords();
-
-                // Get the raw record size of each partition (in records but not in pages)
-                List<Integer> partitionRawRecordsCount = topProcessor.getSpilledRunsSizeInTuples();
-
-                topProcessor.close();
-
-                // get a new estimation on the number of keys in the input data set: if the previous level is pure-partition, 
-                // then use the size inputed in the previous level; otherwise, compute the key ratio in the data set based on
-                // the processed keys.
-                int newKeySizeInPages = (doInputAdjustment && hashedRawRecords > 0) ? (int) Math
-                        .ceil((double) hashedKeys / hashedRawRecords * observedInputSizeInFrames) : (int) Math
-                        .ceil(userProvidedInputSizeInFrames);
-
-                IFrameReader runReader;
-                int runSizeInFrames;
-                int partitionRawRecords;
-
-                while (!runs.isEmpty()) {
-
-                    runReader = runs.remove(0);
-                    runSizeInFrames = runsSizeInFrames.remove(0);
-                    partitionRawRecords = partitionRawRecordsCount.remove(0);
-
-                    // compute the estimated key size in frames for the run file
-                    int runKeySize;
-
-                    if (doInputAdjustment && hashedRawRecords > 0)
-                        runKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInFrames
-                                / observedInputSizeInFrames);
-                    else
-                        runKeySize = (int) Math.ceil((double) userProvidedInputSizeInFrames * partitionRawRecords
-                                / inputSizeInRawRecords);
-
-                    if (topLevelFallbackCheck && runKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
-                        fallBack(runReader, runSizeInFrames, runKeySize, 1);
-                    } else {
-                        processRunFiles(runReader, runKeySize, 1);
-                    }
-                }
+                finishAndRecursion(topProcessor, observedInputSizeInRawTuples, inputSizeInUniqueKeys, 0,
+                        topLevelFallbackCheck);
 
                 writer.close();
 
             }
 
-            private void processRunFiles(IFrameReader runReader, int uniqueKeysOfRunFileInFrames, int runLevel)
+            private void processRunFiles(IFrameReader runReader, int inputCardinality, int runLevel)
                     throws HyracksDataException {
 
                 boolean checkFallback = true;
 
-                int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, uniqueKeysOfRunFileInFrames,
-                        fudgeFactor);
+                int numOfPartitions = getNumberOfPartitions(tableSize, framesLimit, inputCardinality
+                        * userProvidedRecordSizeInBytes / frameSize, fudgeFactor);
 
                 HybridHashGroupHashTable processor = new HybridHashGroupHashTable(ctx, framesLimit, tableSize,
                         numOfPartitions, keyFields, runLevel, comparators, tpcf, aggregatorFactory.createAggregator(
@@ -328,70 +280,82 @@
 
                 runReader.open();
 
-                int inputRunRawSizeInFrames = 0, inputRunRawSizeInTuples = 0;
+                int inputRunRawSizeInTuples = 0;
 
                 if (readAheadBuf == null) {
                     readAheadBuf = ctx.allocateFrame();
                 }
                 while (runReader.nextFrame(readAheadBuf)) {
-                    inputRunRawSizeInFrames++;
                     inputRunRawSizeInTuples += readAheadBuf.getInt(readAheadBuf.capacity() - 4);
                     processor.nextFrame(readAheadBuf);
                 }
 
                 runReader.close();
 
-                processor.finishup();
+                finishAndRecursion(processor, inputRunRawSizeInTuples, inputCardinality, runLevel, checkFallback);
+            }
 
-                List<IFrameReader> runs = processor.getSpilledRuns();
-                List<Integer> runSizes = processor.getSpilledRunsSizeInPages();
-                List<Integer> partitionRawRecords = processor.getSpilledRunsSizeInTuples();
+            /**
+             * Finish the hash table processing and start recursive processing on run files.
+             * 
+             * @param ht
+             * @param inputRawRecordCount
+             * @param inputCardinality
+             * @param level
+             * @param checkFallback
+             * @throws HyracksDataException
+             */
+            private void finishAndRecursion(HybridHashGroupHashTable ht, long inputRawRecordCount,
+                    long inputCardinality, int level, boolean checkFallback) throws HyracksDataException {
 
-                int directFlushKeysInTuples = processor.getHashedUniqueKeys();
-                int directFlushRawRecordsInTuples = processor.getHashedRawRecords();
+                ht.finishup();
 
-                processor.close();
+                List<IFrameReader> generatedRunReaders = ht.getSpilledRuns();
+                List<Integer> partitionRawRecords = ht.getSpilledRunsSizeInRawTuples();
 
-                int newKeySizeInPages = (doInputAdjustment && directFlushRawRecordsInTuples > 0) ? (int) Math
-                        .ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
-                                * inputRunRawSizeInFrames) : uniqueKeysOfRunFileInFrames;
+                int directFlushKeysInTuples = ht.getHashedUniqueKeys();
+                int directFlushRawRecordsInTuples = ht.getHashedRawRecords();
+
+                ht.close();
+                ht = null;
+
+                ctx.getCounterContext().getCounter("optional.levels." + level + ".estiInputKeyCardinality", true)
+                        .update(inputCardinality);
+
+                // do adjustment
+                if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
+                    inputCardinality = (int) Math.ceil((double) directFlushKeysInTuples / directFlushRawRecordsInTuples
+                            * inputRawRecordCount);
+                }
+
+                ctx.getCounterContext()
+                        .getCounter("optional.levels." + level + ".estiInputKeyCardinalityAdjusted", true)
+                        .update(inputCardinality);
 
                 IFrameReader recurRunReader;
-                int runSizeInPages, subPartitionRawRecords;
+                int subPartitionRawRecords;
 
-                while (!runs.isEmpty()) {
-                    recurRunReader = runs.remove(0);
-                    runSizeInPages = runSizes.remove(0);
+                while (!generatedRunReaders.isEmpty()) {
+                    recurRunReader = generatedRunReaders.remove(0);
                     subPartitionRawRecords = partitionRawRecords.remove(0);
 
-                    int newRunKeySize;
+                    int runKeyCardinality = (int) Math.ceil((double) inputCardinality * subPartitionRawRecords
+                            / inputRawRecordCount);
 
-                    if (doInputAdjustment && directFlushRawRecordsInTuples > 0) {
-                        // do adjustment
-                        newRunKeySize = (int) Math.ceil((double) newKeySizeInPages * runSizeInPages
-                                / inputRunRawSizeInFrames);
+                    if ((checkFallback && runKeyCardinality > HYBRID_FALLBACK_THRESHOLD * inputCardinality)
+                            || level > maxRecursiveLevels) {
+                        Logger.getLogger(HybridHashGroupOperatorDescriptor.class.getSimpleName()).warning(
+                                "Hybrid-hash falls back to hash-sort algorithm! (" + level + ":" + maxRecursiveLevels
+                                        + ")");
+                        fallback(recurRunReader, level);
                     } else {
-                        // no adjustment
-                        newRunKeySize = (int) Math.ceil((double) subPartitionRawRecords * uniqueKeysOfRunFileInFrames
-                                / inputRunRawSizeInTuples);
-                    }
-
-                    if (checkFallback && newRunKeySize > HYBRID_FALLBACK_THRESHOLD * newKeySizeInPages) {
-                        fallBack(recurRunReader, runSizeInPages, newRunKeySize, runLevel);
-                    } else {
-                        processRunFiles(recurRunReader, newRunKeySize, runLevel + 1);
+                        processRunFiles(recurRunReader, runKeyCardinality, level + 1);
                     }
 
                 }
             }
 
-            private void fallBack(IFrameReader recurRunReader, int runSizeInPages, int runKeySizeInPages, int runLevel)
-                    throws HyracksDataException {
-                fallbackHashSortAlgorithm(recurRunReader, runLevel + 1);
-            }
-
-            private void fallbackHashSortAlgorithm(IFrameReader recurRunReader, int runLevel)
-                    throws HyracksDataException {
+            private void fallback(IFrameReader recurRunReader, int runLevel) throws HyracksDataException {
                 // fall back
                 FrameTupleAccessor runFrameTupleAccessor = new FrameTupleAccessor(frameSize, inRecDesc);
                 HybridHashSortGroupHashTable hhsTable = new HybridHashSortGroupHashTable(ctx, framesLimit, tableSize,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
deleted file mode 100644
index 666c172..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hybridhash/HybridHashPartitionGenerateFrameWriter.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.group.hybridhash;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-public class HybridHashPartitionGenerateFrameWriter implements IFrameWriter {
-    private final IHyracksTaskContext ctx;
-
-    private RunFileWriter[] partitions;
-
-    private int[] partitionRawSizesInFrames;
-    private int[] partitionRawSizesInTuples;
-
-    private List<IFrameReader> partitionRunReaders;
-    private List<Integer> partitionRunAggregatedPages;
-    private List<Integer> partitionSizeInFrames;
-    private List<Integer> partitionSizeInTuples;
-
-    private ByteBuffer[] outputBuffers;
-
-    private final int numOfPartitions;
-
-    private final ITuplePartitionComputer tpc;
-
-    private final FrameTupleAccessor inFrameTupleAccessor;
-
-    private final FrameTupleAppender outFrameTupleAppender;
-
-    public HybridHashPartitionGenerateFrameWriter(IHyracksTaskContext ctx, int numOfPartitions,
-            ITuplePartitionComputer tpc, RecordDescriptor inRecDesc) {
-        this.ctx = ctx;
-        this.numOfPartitions = numOfPartitions;
-        this.tpc = tpc;
-        this.partitions = new RunFileWriter[numOfPartitions];
-        this.outputBuffers = new ByteBuffer[numOfPartitions];
-        this.partitionRawSizesInTuples = new int[numOfPartitions];
-        this.partitionRawSizesInFrames = new int[numOfPartitions];
-        this.inFrameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecDesc);
-        this.outFrameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#open()
-     */
-    @Override
-    public void open() throws HyracksDataException {
-        // TODO Auto-generated method stub
-
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
-     */
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inFrameTupleAccessor.reset(buffer);
-        int tupleCount = inFrameTupleAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; i++) {
-            int pid = tpc.partition(inFrameTupleAccessor, i, numOfPartitions);
-
-            ctx.getCounterContext().getCounter("optional.partition.insert.count", true).update(1);
-
-            if (outputBuffers[pid] == null) {
-                outputBuffers[pid] = ctx.allocateFrame();
-            }
-            outFrameTupleAppender.reset(outputBuffers[pid], false);
-
-            if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
-                // flush the output buffer
-                if (partitions[pid] == null) {
-                    partitions[pid] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
-                            HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
-                    partitions[pid].open();
-                }
-                FrameUtils.flushFrame(outputBuffers[pid], partitions[pid]);
-                partitionRawSizesInFrames[pid]++;
-                outFrameTupleAppender.reset(outputBuffers[pid], true);
-                if (!outFrameTupleAppender.append(inFrameTupleAccessor, i)) {
-                    throw new HyracksDataException(
-                            "Failed to insert a record into its partition: the record size is too large. ");
-                }
-            }
-            partitionRawSizesInTuples[pid]++;
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#fail()
-     */
-    @Override
-    public void fail() throws HyracksDataException {
-        throw new HyracksDataException("Failed on hash partitioning.");
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see edu.uci.ics.hyracks.api.comm.IFrameWriter#close()
-     */
-    @Override
-    public void close() throws HyracksDataException {
-        outputBuffers = null;
-        for (RunFileWriter partWriter : partitions) {
-            if (partWriter != null)
-                partWriter.close();
-        }
-    }
-
-    public void finishup() throws HyracksDataException {
-        for (int i = 0; i < outputBuffers.length; i++) {
-            if (outputBuffers[i] == null) {
-                continue;
-            }
-            if (partitions[i] == null) {
-                partitions[i] = new RunFileWriter(ctx.getJobletContext().createManagedWorkspaceFile(
-                        HybridHashPartitionGenerateFrameWriter.class.getSimpleName()), ctx.getIOManager());
-                partitions[i].open();
-            }
-            outFrameTupleAppender.reset(outputBuffers[i], false);
-            if (outFrameTupleAppender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outputBuffers[i], partitions[i]);
-                partitionRawSizesInFrames[i]++;
-                outputBuffers[i] = null;
-            }
-        }
-
-        partitionRunReaders = new LinkedList<IFrameReader>();
-        partitionSizeInFrames = new LinkedList<Integer>();
-        partitionSizeInTuples = new LinkedList<Integer>();
-        partitionRunAggregatedPages = new LinkedList<Integer>();
-
-        for (int i = 0; i < numOfPartitions; i++) {
-            if (partitions[i] != null) {
-                partitionRunReaders.add(partitions[i].createReader());
-                partitionRunAggregatedPages.add(0);
-                partitions[i].close();
-                partitionSizeInFrames.add(partitionRawSizesInFrames[i]);
-                partitionSizeInTuples.add(partitionRawSizesInTuples[i]);
-            }
-        }
-
-    }
-
-    public List<IFrameReader> getSpilledRuns() throws HyracksDataException {
-        return partitionRunReaders;
-    }
-
-    public List<Integer> getSpilledRunsSizeInPages() throws HyracksDataException {
-        return partitionSizeInFrames;
-    }
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index d1fec29..fd4f8da 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,20 +29,26 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 
 public class PreclusteredGroupWriter implements IFrameWriter {
+
+    private final static int INT_SIZE = 4;
+
     private final int[] groupFields;
     private final IBinaryComparator[] comparators;
     private final IAggregatorDescriptor aggregator;
     private final AggregateState aggregateState;
     private final IFrameWriter writer;
-    private final ByteBuffer copyFrame;
     private final FrameTupleAccessor inFrameAccessor;
-    private final FrameTupleAccessor copyFrameAccessor;
 
     private final ByteBuffer outFrame;
     private final FrameTupleAppender appender;
     private final ArrayTupleBuilder tupleBuilder;
 
-    private boolean first;
+    private final RecordDescriptor outRecordDesc;
+
+    private byte[] groupResultCache;
+    private ByteBuffer groupResultCacheBuffer;
+    private FrameTupleAccessor groupResultCacheAccessor;
+    private FrameTupleAppender groupResultCacheAppender;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
@@ -52,10 +58,9 @@
         this.aggregator = aggregator;
         this.aggregateState = aggregator.createAggregateStates();
         this.writer = writer;
-        copyFrame = ctx.allocateFrame();
+        this.outRecordDesc = outRecordDesc;
+
         inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
-        copyFrameAccessor.reset(copyFrame);
 
         outFrame = ctx.allocateFrame();
         appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -67,7 +72,6 @@
     @Override
     public void open() throws HyracksDataException {
         writer.open();
-        first = true;
     }
 
     @Override
@@ -75,40 +79,45 @@
         inFrameAccessor.reset(buffer);
         int nTuples = inFrameAccessor.getTupleCount();
         for (int i = 0; i < nTuples; ++i) {
-            if (first) {
 
-                tupleBuilder.reset();
-                for (int j = 0; j < groupFields.length; j++) {
-                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
-                }
-                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
-
-                first = false;
-
-            } else {
-                if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+            if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+                groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
+                if (sameGroup(inFrameAccessor, i, groupResultCacheAccessor, 0)) {
+                    // find match: do aggregation
+                    aggregator.aggregate(inFrameAccessor, i, groupResultCacheAccessor, 0, aggregateState);
+                    continue;
                 } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                    // write the cached group into the final output
+                    writeOutput(groupResultCacheAccessor, 0);
                 }
-
             }
-        }
-        FrameUtils.copy(buffer, copyFrame);
-    }
-
-    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
-            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
-        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
-            writeOutput(prevTupleAccessor, prevTupleIndex);
 
             tupleBuilder.reset();
+
             for (int j = 0; j < groupFields.length; j++) {
-                tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
+                tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
             }
-            aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
-        } else {
-            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+
+            aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+            // enlarge the cache buffer if necessary
+            int requiredSize = tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * INT_SIZE + 2
+                    * INT_SIZE;
+
+            if (groupResultCache == null || groupResultCache.length < requiredSize) {
+                groupResultCache = new byte[requiredSize];
+                groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
+                groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
+                groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, outRecordDesc);
+            }
+
+            groupResultCacheAppender.reset(groupResultCacheBuffer, true);
+            if (!groupResultCacheAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
+            }
+            
+            groupResultCacheAccessor.reset(groupResultCacheBuffer);
         }
     }
 
@@ -117,7 +126,7 @@
 
         tupleBuilder.reset();
         for (int j = 0; j < groupFields.length; j++) {
-            tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
+            tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
         }
         aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
@@ -138,8 +147,8 @@
             int fIdx = groupFields[i];
             int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
             int l1 = a1.getFieldLength(t1Idx, fIdx);
-            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
-            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, i);
+            int l2 = a2.getFieldLength(t2Idx, i);
             if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
                 return false;
             }
@@ -154,8 +163,8 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (!first) {
-            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+        if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
+            writeOutput(groupResultCacheAccessor, 0);
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(outFrame, writer);
             }