reformat the source code

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@981 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 000e459..1744b99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -88,8 +88,8 @@
     protected abstract IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception;
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
                 recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0096576..7cf437d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -86,11 +86,7 @@
     }
 
     private enum State {
-        INIT,
-        IN_RECORD,
-        EOR,
-        CR,
-        EOF
+        INIT, IN_RECORD, EOR, CR, EOF
     }
 
     private class FieldCursor {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index d85f437..76cb0ae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -44,8 +44,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         final FileSplit split = fileSplitProvider.getFileSplits()[partition];
         final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index 7ce8594..8397d29 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -39,8 +39,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            final int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
         final FileSplit[] splits = fileSplitProvider.getFileSplits();
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             private OutputStream out;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 47a3158..6bf29d2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -56,8 +56,15 @@
         this.delim = delim;
     }
 
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.uci.ics.hyracks.api.context.IHyracksContext, edu.uci.ics.hyracks.api.job.IOperatorEnvironment, edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int, int)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.api.dataflow.IActivityNode#createPushRuntime(edu.
+     * uci.ics.hyracks.api.context.IHyracksContext,
+     * edu.uci.ics.hyracks.api.job.IOperatorEnvironment,
+     * edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider, int,
+     * int)
      */
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index 38ac1a3..af345aa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -69,11 +69,14 @@
 
     @Override
     protected void configure() throws Exception {
-        // currently a no-op, but is meant to initialize , if required before it is asked 
+        // currently a no-op, but is meant to initialize , if required before it
+        // is asked
         // to create a record reader
-        // this is executed at the node and is useful for operators that could not be 
-        // initialized from the client completely, because of lack of information specific 
-        // to the node where the operator gets executed. 
+        // this is executed at the node and is useful for operators that could
+        // not be
+        // initialized from the client completely, because of lack of
+        // information specific
+        // to the node where the operator gets executed.
 
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
index 79a62d1..e72f85c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AggregateState.java
@@ -20,25 +20,25 @@
  *
  */
 public class AggregateState implements Serializable {
-    
+
     private static final long serialVersionUID = 1L;
-    
+
     public Object state = null;
-    
-    public AggregateState(){
+
+    public AggregateState() {
         state = null;
     }
-    
-    public AggregateState(Object obj){
+
+    public AggregateState(Object obj) {
         state = obj;
     }
 
     public void reset() {
         state = null;
     }
-    
+
     public void close() {
         state = null;
     }
-    
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 07da19b..370f6d0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -120,6 +120,6 @@
     @Override
     public void fail() throws HyracksDataException {
         // TODO Auto-generated method stub
-        
+
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 2edbf97..0cc47b8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -74,14 +74,10 @@
     private final ISpillableTableFactory spillableTableFactory;
     private final boolean isOutputSorted;
 
-    public ExternalGroupOperatorDescriptor(JobSpecification spec,
-            int[] keyFields, int framesLimit,
-            IBinaryComparatorFactory[] comparatorFactories,
-            INormalizedKeyComputerFactory firstNormalizerFactory,
-            IAggregatorDescriptorFactory aggregatorFactory,
-            IAggregatorDescriptorFactory mergerFactory,
-            RecordDescriptor recordDescriptor,
-            ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
+    public ExternalGroupOperatorDescriptor(JobSpecification spec, int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, IAggregatorDescriptorFactory mergerFactory,
+            RecordDescriptor recordDescriptor, ISpillableTableFactory spillableTableFactory, boolean isOutputSorted) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         if (framesLimit <= 1) {
@@ -89,9 +85,7 @@
              * Minimum of 2 frames: 1 for input records, and 1 for output
              * aggregation results.
              */
-            throw new IllegalStateException(
-                    "frame limit should at least be 2, but it is "
-                            + framesLimit + "!");
+            throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
         }
         this.aggregatorFactory = aggregatorFactory;
         this.mergerFactory = mergerFactory;
@@ -117,10 +111,8 @@
      */
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(
-                getOperatorId(), AGGREGATE_ACTIVITY_ID));
-        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId,
-                MERGE_ACTIVITY_ID));
+        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
         builder.addActivity(aggregateAct);
         builder.addSourceEdge(0, aggregateAct, 0);
@@ -162,35 +154,28 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions)
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(
-                    ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0));
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private AggregateActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new AggregateActivityState(ctx.getJobletContext()
-                            .getJobId(), new TaskId(getActivityId(), partition));
+                    state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
                     state.runs = new LinkedList<RunFileReader>();
-                    state.gTable = spillableTableFactory.buildSpillableTable(
-                            ctx, keyFields, comparatorFactories,
+                    state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
                             firstNormalizerFactory, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(
-                                    getOperatorId(), 0), recordDescriptors[0],
+                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
                             ExternalGroupOperatorDescriptor.this.framesLimit);
                     state.gTable.reset();
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer)
-                        throws HyracksDataException {
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; i++) {
@@ -230,15 +215,12 @@
                 private void flushFramesToRun() throws HyracksDataException {
                     FileReference runFile;
                     try {
-                        runFile = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(
-                                        ExternalGroupOperatorDescriptor.class
-                                                .getSimpleName());
+                        runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+                                ExternalGroupOperatorDescriptor.class.getSimpleName());
                     } catch (IOException e) {
                         throw new HyracksDataException(e);
                     }
-                    RunFileWriter writer = new RunFileWriter(runFile,
-                            ctx.getIOManager());
+                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
                     writer.open();
                     try {
                         state.gTable.sortFrames();
@@ -264,15 +246,12 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions)
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i]
-                        .createBinaryComparator();
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
             int[] keyFieldsInPartialResults = new int[keyFields.length];
@@ -280,12 +259,9 @@
                 keyFieldsInPartialResults[i] = i;
             }
 
-            final IAggregatorDescriptor aggregator = mergerFactory
-                    .createAggregator(ctx, recordDescriptors[0],
-                            recordDescriptors[0], keyFields,
-                            keyFieldsInPartialResults);
-            final AggregateState aggregateState = aggregator
-                    .createAggregateStates();
+            final IAggregatorDescriptor aggregator = mergerFactory.createAggregator(ctx, recordDescriptors[0],
+                    recordDescriptors[0], keyFields, keyFieldsInPartialResults);
+            final AggregateState aggregateState = aggregator.createAggregateStates();
 
             final int[] storedKeys = new int[keyFields.length];
             /**
@@ -295,8 +271,7 @@
                 storedKeys[i] = i;
             }
 
-            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
-                    recordDescriptors[0].getFields().length);
+            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 /**
@@ -308,8 +283,7 @@
                  * Output frame.
                  */
                 private ByteBuffer outFrame, writerFrame;
-                private final FrameTupleAppender outAppender = new FrameTupleAppender(
-                        ctx.getFrameSize());
+                private final FrameTupleAppender outAppender = new FrameTupleAppender(ctx.getFrameSize());
                 private FrameTupleAppender writerAppender;
 
                 private LinkedList<RunFileReader> runs;
@@ -326,14 +300,12 @@
                 private int[] currentFrameIndexInRun;
                 private int[] currentRunFrames;
 
-                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(
-                        ctx.getFrameSize(), recordDescriptors[0]);
+                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                        recordDescriptors[0]);
 
                 public void initialize() throws HyracksDataException {
-                    aggState = (AggregateActivityState) ctx
-                            .getTaskState(new TaskId(new ActivityId(
-                                    getOperatorId(), AGGREGATE_ACTIVITY_ID),
-                                    partition));
+                    aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            AGGREGATE_ACTIVITY_ID), partition));
                     runs = aggState.runs;
                     writer.open();
                     try {
@@ -368,8 +340,7 @@
                     }
                 }
 
-                private void doPass(LinkedList<RunFileReader> runs)
-                        throws HyracksDataException {
+                private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
                     FileReference newRun = null;
                     IFrameWriter writer = this.writer;
                     boolean finalPass = false;
@@ -384,10 +355,8 @@
                         runNumber = runs.size();
                     } else {
                         runNumber = framesLimit - 2;
-                        newRun = ctx.getJobletContext()
-                                .createManagedWorkspaceFile(
-                                        ExternalGroupOperatorDescriptor.class
-                                                .getSimpleName());
+                        newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+                                ExternalGroupOperatorDescriptor.class.getSimpleName());
                         writer = new RunFileWriter(newRun, ctx.getIOManager());
                         writer.open();
                     }
@@ -399,12 +368,10 @@
                          * the ones fit into the inFrames
                          */
                         RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames
-                                .size()];
+                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
                         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(
-                                ctx.getFrameSize(), recordDescriptors[0],
-                                runNumber, comparator);
+                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
+                                recordDescriptors[0], runNumber, comparator);
                         /**
                          * current tuple index in each run
                          */
@@ -417,26 +384,19 @@
                             runFileReaders[runIndex].open();
 
                             currentRunFrames[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex] = runIndex
-                                    * runFrameLimit;
+                            currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
                             for (int j = 0; j < runFrameLimit; j++) {
-                                int frameIndex = currentFrameIndexInRun[runIndex]
-                                        + j;
-                                if (runFileReaders[runIndex].nextFrame(inFrames
-                                        .get(frameIndex))) {
-                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(
-                                            ctx.getFrameSize(),
+                                int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                                if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
                                             recordDescriptors[0]);
-                                    tupleAccessors[frameIndex].reset(inFrames
-                                            .get(frameIndex));
+                                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
                                     currentRunFrames[runIndex]++;
                                     if (j == 0)
-                                        setNextTopTuple(runIndex, tupleIndices,
-                                                runFileReaders, tupleAccessors,
+                                        setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
                                                 topTuples);
                                 } else {
-                                    closeRun(runIndex, runFileReaders,
-                                            tupleAccessors);
+                                    closeRun(runIndex, runFileReaders, tupleAccessors);
                                     break;
                                 }
                             }
@@ -454,12 +414,9 @@
                             int runIndex = topTuples.peek().getRunid();
                             FrameTupleAccessor fta = top.getAccessor();
 
-                            int currentTupleInOutFrame = outFrameAccessor
-                                    .getTupleCount() - 1;
+                            int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
                             if (currentTupleInOutFrame < 0
-                                    || compareFrameTuples(fta, tupleIndex,
-                                            outFrameAccessor,
-                                            currentTupleInOutFrame) != 0) {
+                                    || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
                                 /**
                                  * Initialize the first output record Reset the
                                  * tuple builder
@@ -467,18 +424,13 @@
 
                                 tupleBuilder.reset();
 
-                                aggregator.init(tupleBuilder, fta, tupleIndex,
-                                        aggregateState);
+                                aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
 
-                                if (!outAppender.appendSkipEmptyField(
-                                        tupleBuilder.getFieldEndOffsets(),
-                                        tupleBuilder.getByteArray(), 0,
-                                        tupleBuilder.getSize())) {
+                                if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
                                     flushOutFrame(writer, finalPass);
-                                    if (!outAppender.appendSkipEmptyField(
-                                            tupleBuilder.getFieldEndOffsets(),
-                                            tupleBuilder.getByteArray(), 0,
-                                            tupleBuilder.getSize())) {
+                                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
                                         throw new HyracksDataException(
                                                 "The partial result is too large to be initialized in a frame.");
                                     }
@@ -491,14 +443,12 @@
                                  * outFrame
                                  */
 
-                                aggregator.aggregate(fta, tupleIndex,
-                                        outFrameAccessor,
-                                        currentTupleInOutFrame, aggregateState);
+                                aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame,
+                                        aggregateState);
 
                             }
                             tupleIndices[runIndex]++;
-                            setNextTopTuple(runIndex, tupleIndices,
-                                    runFileReaders, tupleAccessors, topTuples);
+                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
                         }
 
                         if (outAppender.getTupleCount() > 0) {
@@ -523,12 +473,10 @@
                     }
                 }
 
-                private void flushOutFrame(IFrameWriter writer, boolean isFinal)
-                        throws HyracksDataException {
+                private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
 
                     if (finalTupleBuilder == null) {
-                        finalTupleBuilder = new ArrayTupleBuilder(
-                                recordDescriptors[0].getFields().length);
+                        finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
                     }
 
                     if (writerFrame == null) {
@@ -536,8 +484,7 @@
                     }
 
                     if (writerAppender == null) {
-                        writerAppender = new FrameTupleAppender(
-                                ctx.getFrameSize());
+                        writerAppender = new FrameTupleAppender(ctx.getFrameSize());
                         writerAppender.reset(writerFrame, true);
                     }
 
@@ -549,21 +496,19 @@
 
                         if (isFinal) {
 
-                            aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i,
-                                    aggregateState);
-
+                            aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
 
                         } else {
 
-                            aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i,
-                                    aggregateState);
+                            aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
                         }
-                        
-                        
-                        if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+
+                        if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                                finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
                             FrameUtils.flushFrame(writerFrame, writer);
                             writerAppender.reset(writerFrame, true);
-                            if(!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(), finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
                                 throw new HyracksDataException(
                                         "Aggregation output is too large to be fit into a frame.");
                             }
@@ -573,19 +518,16 @@
                         FrameUtils.flushFrame(writerFrame, writer);
                         writerAppender.reset(writerFrame, true);
                     }
-                    
+
                     outAppender.reset(outFrame, true);
                 }
 
-                private void setNextTopTuple(int runIndex, int[] tupleIndices,
-                        RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors,
-                        ReferencedPriorityQueue topTuples)
+                private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
                         throws HyracksDataException {
                     int runStart = runIndex * runFrameLimit;
                     boolean existNext = false;
-                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null
-                            || runCursors[runIndex] == null) {
+                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
                         /**
                          * run already closed
                          */
@@ -595,8 +537,7 @@
                          * not the last frame for this run
                          */
                         existNext = true;
-                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]]
-                                .getTupleCount()) {
+                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
                             tupleIndices[runIndex] = 0;
                             currentFrameIndexInRun[runIndex]++;
                         }
@@ -619,15 +560,13 @@
                          */
                         currentRunFrames[runIndex] = 0;
                         for (int j = 0; j < runFrameLimit; j++, frameOffset++) {
-                            ByteBuffer buffer = tupleAccessors[frameOffset]
-                                    .getBuffer();
+                            ByteBuffer buffer = tupleAccessors[frameOffset].getBuffer();
                             if (runCursors[runIndex].nextFrame(buffer)) {
                                 tupleAccessors[frameOffset].reset(buffer);
                                 if (tupleAccessors[frameOffset].getTupleCount() > 0) {
                                     existNext = true;
                                 } else {
-                                    throw new IllegalStateException(
-                                            "illegal: empty run file");
+                                    throw new IllegalStateException("illegal: empty run file");
                                 }
                                 currentRunFrames[runIndex]++;
                             } else {
@@ -637,10 +576,8 @@
                     }
 
                     if (existNext) {
-                        topTuples
-                                .popAndReplace(
-                                        tupleAccessors[currentFrameIndexInRun[runIndex]],
-                                        tupleIndices[runIndex]);
+                        topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
+                                tupleIndices[runIndex]);
                     } else {
                         topTuples.pop();
                         closeRun(runIndex, runCursors, tupleAccessors);
@@ -656,8 +593,7 @@
                  * @param tupleAccessor
                  * @throws HyracksDataException
                  */
-                private void closeRun(int index, RunFileReader[] runCursors,
-                        IFrameTupleAccessor[] tupleAccessor)
+                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
                         throws HyracksDataException {
                     if (runCursors[index] != null) {
                         runCursors[index].close();
@@ -666,18 +602,15 @@
                     }
                 }
 
-                private int compareFrameTuples(IFrameTupleAccessor fta1,
-                        int j1, IFrameTupleAccessor fta2, int j2) {
+                private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
                         int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1)
-                                + fta1.getFieldSlotsLength()
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
                         int l1 = fta1.getFieldLength(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2)
-                                + fta2.getFieldSlotsLength()
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
                                 + fta2.getFieldStartOffset(j2, fIdx);
                         int l2_start = fta2.getFieldStartOffset(j2, fIdx);
                         int l2_end = fta2.getFieldEndOffset(j2, fIdx);
@@ -693,32 +626,25 @@
             return op;
         }
 
-        private Comparator<ReferenceEntry> createEntryComparator(
-                final IBinaryComparator[] comparators) {
+        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
             return new Comparator<ReferenceEntry>() {
 
                 @Override
                 public int compare(ReferenceEntry o1, ReferenceEntry o2) {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1
-                            .getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2
-                            .getAccessor();
+                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
                     int j1 = o1.getTupleIndex();
                     int j2 = o2.getTupleIndex();
                     byte[] b1 = fta1.getBuffer().array();
                     byte[] b2 = fta2.getBuffer().array();
                     for (int f = 0; f < keyFields.length; ++f) {
                         int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1)
-                                + fta1.getFieldSlotsLength()
+                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
                                 + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx)
-                                - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2)
-                                + fta2.getFieldSlotsLength()
+                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
                                 + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx)
-                                - fta2.getFieldStartOffset(j2, fIdx);
+                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
                         int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
                         if (c != 0) {
                             return c;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
index ee29c56..2346160 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/FrameToolsForGroupers.java
@@ -25,20 +25,17 @@
  */
 public class FrameToolsForGroupers {
 
-    public static void writeFields(byte[] buf, int offset, int length,
-            ArrayTupleBuilder tupleBuilder) throws HyracksDataException {
-        writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(),
-                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
+    public static void writeFields(byte[] buf, int offset, int length, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException {
+        writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize());
     }
 
-    public static void writeFields(byte[] buf, int offset, int length,
-            int[] fieldsOffset, byte[] data, int dataOffset, int dataLength)
-            throws HyracksDataException {
+    public static void writeFields(byte[] buf, int offset, int length, int[] fieldsOffset, byte[] data, int dataOffset,
+            int dataLength) throws HyracksDataException {
         if (dataLength + 4 * fieldsOffset.length > length) {
-            throw new HyracksDataException(
-                    "Out of buffer bound: try to write too much data ("
-                            + dataLength + ") to the given bound (" + length
-                            + ").");
+            throw new HyracksDataException("Out of buffer bound: try to write too much data (" + dataLength
+                    + ") to the given bound (" + length + ").");
         }
 
         ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
@@ -48,60 +45,49 @@
         buffer.put(data, dataOffset, dataLength);
     }
 
-    public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
-            int addedTupleLength) throws HyracksDataException {
-        int currentTupleCount = buffer.getInt(FrameHelper
-                .getTupleCountOffset(buffer.capacity()));
-        int currentTupleEndOffset = buffer
-                .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                        * currentTupleCount);
+    public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength) throws HyracksDataException {
+        int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+        int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                * currentTupleCount);
         int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
 
         // update tuple end offset
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                * (currentTupleCount + 1), newTupleEndOffset);
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
+                newTupleEndOffset);
         // Update the tuple count
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
-                currentTupleCount + 1);
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
     }
 
-    public static void updateFrameMetaForNewTuple(ByteBuffer buffer,
-            int addedTupleLength, boolean isReset) throws HyracksDataException {
+    public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength, boolean isReset)
+            throws HyracksDataException {
         int currentTupleCount;
         int currentTupleEndOffset;
         if (isReset) {
             currentTupleCount = 0;
             currentTupleEndOffset = 0;
         } else {
-            currentTupleCount = buffer.getInt(FrameHelper
-                    .getTupleCountOffset(buffer.capacity()));
-            currentTupleEndOffset = buffer.getInt(FrameHelper
-                    .getTupleCountOffset(buffer.capacity())
-                    - 4
+            currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+            currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
                     * currentTupleCount);
         }
         int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
 
         // update tuple end offset
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                * (currentTupleCount + 1), newTupleEndOffset);
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
+                newTupleEndOffset);
         // Update the tuple count
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()),
-                currentTupleCount + 1);
+        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
     }
 
     public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
             throws HyracksDataException {
-        
-        int currentTupleCount = buffer.getInt(FrameHelper
-                .getTupleCountOffset(buffer.capacity()));
-        if(currentTupleCount == 0 || isReset){
+
+        int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
+        if (currentTupleCount == 0 || isReset) {
             return length + 4 + 4 > buffer.capacity();
         }
-        int currentTupleEndOffset = buffer
-                .getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                        * currentTupleCount);
-        return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer
-                .capacity();
+        int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
+                * currentTupleCount);
+        return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer.capacity();
     }
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index ef427d1..43bb0ae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -34,242 +34,218 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 
 class GroupingHashTable {
-	/**
-	 * The pointers in the link store 3 int values for each entry in the
-	 * hashtable: (bufferIdx, tIndex, accumulatorIdx).
-	 * 
-	 * @author vinayakb
-	 */
-	private static class Link {
-		private static final int INIT_POINTERS_SIZE = 9;
+    /**
+     * The pointers in the link store 3 int values for each entry in the
+     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * 
+     * @author vinayakb
+     */
+    private static class Link {
+        private static final int INIT_POINTERS_SIZE = 9;
 
-		int[] pointers;
-		int size;
+        int[] pointers;
+        int size;
 
-		Link() {
-			pointers = new int[INIT_POINTERS_SIZE];
-			size = 0;
-		}
+        Link() {
+            pointers = new int[INIT_POINTERS_SIZE];
+            size = 0;
+        }
 
-		void add(int bufferIdx, int tIndex, int accumulatorIdx) {
-			while (size + 3 > pointers.length) {
-				pointers = Arrays.copyOf(pointers, pointers.length * 2);
-			}
-			pointers[size++] = bufferIdx;
-			pointers[size++] = tIndex;
-			pointers[size++] = accumulatorIdx;
-		}
-	}
+        void add(int bufferIdx, int tIndex, int accumulatorIdx) {
+            while (size + 3 > pointers.length) {
+                pointers = Arrays.copyOf(pointers, pointers.length * 2);
+            }
+            pointers[size++] = bufferIdx;
+            pointers[size++] = tIndex;
+            pointers[size++] = accumulatorIdx;
+        }
+    }
 
-	private static final int INIT_AGG_STATE_SIZE = 8;
-	private final IHyracksTaskContext ctx;
+    private static final int INIT_AGG_STATE_SIZE = 8;
+    private final IHyracksTaskContext ctx;
 
-	private final List<ByteBuffer> buffers;
-	private final Link[] table;
-	/**
-	 * Aggregate states: a list of states for all groups maintained in the main
-	 * memory.
-	 */
-	private AggregateState[] aggregateStates;
-	private int accumulatorSize;
+    private final List<ByteBuffer> buffers;
+    private final Link[] table;
+    /**
+     * Aggregate states: a list of states for all groups maintained in the main
+     * memory.
+     */
+    private AggregateState[] aggregateStates;
+    private int accumulatorSize;
 
-	private int lastBIndex;
-	private final int[] storedKeys;
-	private final int[] keys;
-	private final IBinaryComparator[] comparators;
-	private final FrameTuplePairComparator ftpc;
-	private final ITuplePartitionComputer tpc;
-	private final IAggregatorDescriptor aggregator;
+    private int lastBIndex;
+    private final int[] storedKeys;
+    private final int[] keys;
+    private final IBinaryComparator[] comparators;
+    private final FrameTuplePairComparator ftpc;
+    private final ITuplePartitionComputer tpc;
+    private final IAggregatorDescriptor aggregator;
 
-	private final FrameTupleAppender appender;
+    private final FrameTupleAppender appender;
 
-	private final FrameTupleAccessor storedKeysAccessor;
+    private final FrameTupleAccessor storedKeysAccessor;
 
-	private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
+    private final ArrayTupleBuilder stateTupleBuilder, outputTupleBuilder;
 
-	GroupingHashTable(IHyracksTaskContext ctx, int[] fields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			ITuplePartitionComputerFactory tpcf,
-			IAggregatorDescriptorFactory aggregatorFactory,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, int tableSize)
-			throws HyracksDataException {
-		this.ctx = ctx;
+    GroupingHashTable(IHyracksTaskContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
+            ITuplePartitionComputerFactory tpcf, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize)
+            throws HyracksDataException {
+        this.ctx = ctx;
 
-		buffers = new ArrayList<ByteBuffer>();
-		table = new Link[tableSize];
+        buffers = new ArrayList<ByteBuffer>();
+        table = new Link[tableSize];
 
-		keys = fields;
-		storedKeys = new int[fields.length];
-		@SuppressWarnings("rawtypes")
-		ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
-		for (int i = 0; i < fields.length; ++i) {
-			storedKeys[i] = i;
-			storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
-		}
+        keys = fields;
+        storedKeys = new int[fields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[fields.length];
+        for (int i = 0; i < fields.length; ++i) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[fields[i]];
+        }
 
-		comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
-		ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
-		tpc = tpcf.createPartitioner();
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        ftpc = new FrameTuplePairComparator(fields, storedKeys, comparators);
+        tpc = tpcf.createPartitioner();
 
-		int[] keyFieldsInPartialResults = new int[fields.length];
-		for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-			keyFieldsInPartialResults[i] = i;
-		}
+        int[] keyFieldsInPartialResults = new int[fields.length];
+        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+            keyFieldsInPartialResults[i] = i;
+        }
 
-		this.aggregator = aggregatorFactory.createAggregator(ctx,
-				inRecordDescriptor, outRecordDescriptor, fields,
-				keyFieldsInPartialResults);
+        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
+                keyFieldsInPartialResults);
 
-		this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
-		accumulatorSize = 0;
+        this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
+        accumulatorSize = 0;
 
-		RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(
-				storedKeySerDeser);
-		storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				storedKeysRecordDescriptor);
-		lastBIndex = -1;
+        RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
+        storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+        lastBIndex = -1;
 
-		appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender = new FrameTupleAppender(ctx.getFrameSize());
 
-		addNewBuffer();
+        addNewBuffer();
 
-		if (fields.length < outRecordDescriptor.getFields().length) {
-			stateTupleBuilder = new ArrayTupleBuilder(
-					outRecordDescriptor.getFields().length);
-		} else {
-			stateTupleBuilder = new ArrayTupleBuilder(
-					outRecordDescriptor.getFields().length + 1);
-		}
-		outputTupleBuilder = new ArrayTupleBuilder(
-				outRecordDescriptor.getFields().length);
-	}
+        if (fields.length < outRecordDescriptor.getFields().length) {
+            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        } else {
+            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
+        }
+        outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+    }
 
-	private void addNewBuffer() {
-		ByteBuffer buffer = ctx.allocateFrame();
-		buffer.position(0);
-		buffer.limit(buffer.capacity());
-		buffers.add(buffer);
-		appender.reset(buffer, true);
-		++lastBIndex;
-	}
+    private void addNewBuffer() {
+        ByteBuffer buffer = ctx.allocateFrame();
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        buffers.add(buffer);
+        appender.reset(buffer, true);
+        ++lastBIndex;
+    }
 
-	void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
-		int entry = tpc.partition(accessor, tIndex, table.length);
-		Link link = table[entry];
-		if (link == null) {
-			link = table[entry] = new Link();
-		}
-		int saIndex = -1;
-		for (int i = 0; i < link.size; i += 3) {
-			int sbIndex = link.pointers[i];
-			int stIndex = link.pointers[i + 1];
-			storedKeysAccessor.reset(buffers.get(sbIndex));
-			int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
-			if (c == 0) {
-				saIndex = link.pointers[i + 2];
-				break;
-			}
-		}
-		if (saIndex < 0) {
-			// Did not find the key. Insert a new entry.
-			saIndex = accumulatorSize++;
-			// Add keys
+    void insert(FrameTupleAccessor accessor, int tIndex) throws Exception {
+        int entry = tpc.partition(accessor, tIndex, table.length);
+        Link link = table[entry];
+        if (link == null) {
+            link = table[entry] = new Link();
+        }
+        int saIndex = -1;
+        for (int i = 0; i < link.size; i += 3) {
+            int sbIndex = link.pointers[i];
+            int stIndex = link.pointers[i + 1];
+            storedKeysAccessor.reset(buffers.get(sbIndex));
+            int c = ftpc.compare(accessor, tIndex, storedKeysAccessor, stIndex);
+            if (c == 0) {
+                saIndex = link.pointers[i + 2];
+                break;
+            }
+        }
+        if (saIndex < 0) {
+            // Did not find the key. Insert a new entry.
+            saIndex = accumulatorSize++;
+            // Add keys
 
-			// Add aggregation fields
-			AggregateState newState = aggregator.createAggregateStates();
+            // Add aggregation fields
+            AggregateState newState = aggregator.createAggregateStates();
 
-			stateTupleBuilder.reset();
-			for (int k = 0; k < keys.length; k++) {
-				stateTupleBuilder.addField(accessor, tIndex, keys[k]);
-			}
+            stateTupleBuilder.reset();
+            for (int k = 0; k < keys.length; k++) {
+                stateTupleBuilder.addField(accessor, tIndex, keys[k]);
+            }
 
-			aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
+            aggregator.init(stateTupleBuilder, accessor, tIndex, newState);
 
-			if (!appender.appendSkipEmptyField(
-					stateTupleBuilder.getFieldEndOffsets(),
-					stateTupleBuilder.getByteArray(), 0,
-					stateTupleBuilder.getSize())) {
-				addNewBuffer();
-				if (!appender.appendSkipEmptyField(
-						stateTupleBuilder.getFieldEndOffsets(),
-						stateTupleBuilder.getByteArray(), 0,
-						stateTupleBuilder.getSize())) {
-					throw new HyracksDataException(
-							"Cannot init the aggregate state in a single frame.");
-				}
-			}
+            if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                    stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+                addNewBuffer();
+                if (!appender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                        stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Cannot init the aggregate state in a single frame.");
+                }
+            }
 
-			if (accumulatorSize >= aggregateStates.length) {
-				aggregateStates = Arrays.copyOf(aggregateStates,
-						aggregateStates.length * 2);
-			}
+            if (accumulatorSize >= aggregateStates.length) {
+                aggregateStates = Arrays.copyOf(aggregateStates, aggregateStates.length * 2);
+            }
 
-			aggregateStates[saIndex] = newState;
+            aggregateStates[saIndex] = newState;
 
-			link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
+            link.add(lastBIndex, appender.getTupleCount() - 1, saIndex);
 
-		} else {
-			aggregator.aggregate(accessor, tIndex, null, 0,
-					aggregateStates[saIndex]);
-		}
-	}
+        } else {
+            aggregator.aggregate(accessor, tIndex, null, 0, aggregateStates[saIndex]);
+        }
+    }
 
-	void write(IFrameWriter writer) throws HyracksDataException {
-		ByteBuffer buffer = ctx.allocateFrame();
-		appender.reset(buffer, true);
+    void write(IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer buffer = ctx.allocateFrame();
+        appender.reset(buffer, true);
 
-		for (int i = 0; i < table.length; ++i) {
-			Link link = table[i];
-			if (link != null) {
-				for (int j = 0; j < link.size; j += 3) {
-					int bIndex = link.pointers[j];
-					int tIndex = link.pointers[j + 1];
-					int aIndex = link.pointers[j + 2];
-					ByteBuffer keyBuffer = buffers.get(bIndex);
-					storedKeysAccessor.reset(keyBuffer);
+        for (int i = 0; i < table.length; ++i) {
+            Link link = table[i];
+            if (link != null) {
+                for (int j = 0; j < link.size; j += 3) {
+                    int bIndex = link.pointers[j];
+                    int tIndex = link.pointers[j + 1];
+                    int aIndex = link.pointers[j + 2];
+                    ByteBuffer keyBuffer = buffers.get(bIndex);
+                    storedKeysAccessor.reset(keyBuffer);
 
-					// copy keys
-					outputTupleBuilder.reset();
-					for (int k = 0; k < storedKeys.length; k++) {
-						outputTupleBuilder.addField(storedKeysAccessor, tIndex,
-								storedKeys[k]);
-					}
+                    // copy keys
+                    outputTupleBuilder.reset();
+                    for (int k = 0; k < storedKeys.length; k++) {
+                        outputTupleBuilder.addField(storedKeysAccessor, tIndex, storedKeys[k]);
+                    }
 
-					aggregator
-							.outputFinalResult(outputTupleBuilder,
-									storedKeysAccessor, tIndex,
-									aggregateStates[aIndex]);
+                    aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor, tIndex,
+                            aggregateStates[aIndex]);
 
-					if (!appender.appendSkipEmptyField(
-							outputTupleBuilder.getFieldEndOffsets(),
-							outputTupleBuilder.getByteArray(), 0,
-							outputTupleBuilder.getSize())) {
-						writer.nextFrame(buffer);
-						appender.reset(buffer, true);
-						if (!appender.appendSkipEmptyField(
-								outputTupleBuilder.getFieldEndOffsets(),
-								outputTupleBuilder.getByteArray(), 0,
-								outputTupleBuilder.getSize())) {
-							throw new HyracksDataException(
-									"Cannot write the aggregation output into a frame.");
-						}
-					}
+                    if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                        writer.nextFrame(buffer);
+                        appender.reset(buffer, true);
+                        if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                            throw new HyracksDataException("Cannot write the aggregation output into a frame.");
+                        }
+                    }
 
-				}
-			}
-		}
-		if (appender.getTupleCount() != 0) {
-			writer.nextFrame(buffer);
-		}
-	}
+                }
+            }
+        }
+        if (appender.getTupleCount() != 0) {
+            writer.nextFrame(buffer);
+        }
+    }
 
-	void close() throws HyracksDataException {
-		for (AggregateState aState : aggregateStates) {
-			aState.close();
-		}
-	}
+    void close() throws HyracksDataException {
+        for (AggregateState aState : aggregateStates) {
+            aState.close();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index c089dd4..49443d1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -57,10 +57,8 @@
 
     private final int tableSize;
 
-    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys,
-            ITuplePartitionComputerFactory tpcf,
-            IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory,
+    public HashGroupOperatorDescriptor(JobSpecification spec, int[] keys, ITuplePartitionComputerFactory tpcf,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDescriptor, int tableSize) {
         super(spec, 1, 1);
         this.keys = keys;
@@ -80,12 +78,10 @@
      */
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId,
-                HASH_BUILD_ACTIVITY_ID));
+        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
         builder.addActivity(ha);
 
-        OutputActivity oa = new OutputActivity(new ActivityId(odId,
-                OUTPUT_ACTIVITY_ID));
+        OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
         builder.addActivity(oa);
 
         builder.addSourceEdge(0, ha, 0);
@@ -122,31 +118,24 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(
-                    ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(
-                            getOperatorId(), 0));
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new HashBuildActivityState(ctx.getJobletContext()
-                            .getJobId(), new TaskId(getActivityId(), partition));
-                    state.table = new GroupingHashTable(ctx, keys,
-                            comparatorFactories, tpcf, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(
-                                    getOperatorId(), 0), recordDescriptors[0],
+                    state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
                             tableSize);
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer)
-                        throws HyracksDataException {
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; ++i) {
@@ -166,8 +155,7 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    throw new HyracksDataException(
-                            "HashGroupOperator is failed.");
+                    throw new HyracksDataException("HashGroupOperator is failed.");
                 }
             };
         }
@@ -181,17 +169,13 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(
-                final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    HashBuildActivityState buildState = (HashBuildActivityState) ctx
-                            .getTaskState(new TaskId(new ActivityId(
-                                    getOperatorId(), HASH_BUILD_ACTIVITY_ID),
-                                    partition));
+                    HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
+                            new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
                     GroupingHashTable table = buildState.table;
                     writer.open();
                     try {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 29bd083..831e497 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -44,312 +44,265 @@
  */
 public class HashSpillableTableFactory implements ISpillableTableFactory {
 
-	private static final long serialVersionUID = 1L;
-	private final ITuplePartitionComputerFactory tpcf;
-	private final int tableSize;
+    private static final long serialVersionUID = 1L;
+    private final ITuplePartitionComputerFactory tpcf;
+    private final int tableSize;
 
-	public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf,
-			int tableSize) {
-		this.tpcf = tpcf;
-		this.tableSize = tableSize;
-	}
+    public HashSpillableTableFactory(ITuplePartitionComputerFactory tpcf, int tableSize) {
+        this.tpcf = tpcf;
+        this.tableSize = tableSize;
+    }
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
-	 * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
-	 * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
-	 * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
-	 * edu.
-	 * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
-	 * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
-	 * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
-	 */
-	@Override
-	public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx,
-			final int[] keyFields,
-			IBinaryComparatorFactory[] comparatorFactories,
-			INormalizedKeyComputerFactory firstKeyNormalizerFactory,
-			IAggregatorDescriptorFactory aggregateFactory,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor, final int framesLimit)
-			throws HyracksDataException {
-		final int[] storedKeys = new int[keyFields.length];
-		@SuppressWarnings("rawtypes")
-		ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
-		for (int i = 0; i < keyFields.length; i++) {
-			storedKeys[i] = i;
-			storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
-		}
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.dataflow.std.aggregations.ISpillableTableFactory#
+     * buildSpillableTable(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * int[], edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory[],
+     * edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory,
+     * edu.
+     * uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory
+     * [], edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int)
+     */
+    @Override
+    public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, final int[] keyFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int framesLimit) throws HyracksDataException {
+        final int[] storedKeys = new int[keyFields.length];
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keyFields.length];
+        for (int i = 0; i < keyFields.length; i++) {
+            storedKeys[i] = i;
+            storedKeySerDeser[i] = inRecordDescriptor.getFields()[keyFields[i]];
+        }
 
-		RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
-		final FrameTupleAccessor storedKeysAccessor1;
-		final FrameTupleAccessor storedKeysAccessor2;
-		if (keyFields.length >= outRecordDescriptor.getFields().length) {
-			// for the case of zero-aggregations
-			ISerializerDeserializer<?>[] fields = outRecordDescriptor
-					.getFields();
-			ITypeTraits[] types = outRecordDescriptor.getTypeTraits();
-			ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
-			for (int i = 0; i < fields.length; i++)
-				newFields[i] = fields[i];
-			ITypeTraits[] newTypes = null;
-			if (types != null) {
-				newTypes = new ITypeTraits[types.length + 1];
-				for (int i = 0; i < types.length; i++)
-					newTypes[i] = types[i];
-			}
-			internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
-		}
-		storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(),
-				internalRecordDescriptor);
-		storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(),
-				internalRecordDescriptor);
+        RecordDescriptor internalRecordDescriptor = outRecordDescriptor;
+        final FrameTupleAccessor storedKeysAccessor1;
+        final FrameTupleAccessor storedKeysAccessor2;
+        if (keyFields.length >= outRecordDescriptor.getFields().length) {
+            // for the case of zero-aggregations
+            ISerializerDeserializer<?>[] fields = outRecordDescriptor.getFields();
+            ITypeTraits[] types = outRecordDescriptor.getTypeTraits();
+            ISerializerDeserializer<?>[] newFields = new ISerializerDeserializer[fields.length + 1];
+            for (int i = 0; i < fields.length; i++)
+                newFields[i] = fields[i];
+            ITypeTraits[] newTypes = null;
+            if (types != null) {
+                newTypes = new ITypeTraits[types.length + 1];
+                for (int i = 0; i < types.length; i++)
+                    newTypes[i] = types[i];
+            }
+            internalRecordDescriptor = new RecordDescriptor(newFields, newTypes);
+        }
+        storedKeysAccessor1 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
+        storedKeysAccessor2 = new FrameTupleAccessor(ctx.getFrameSize(), internalRecordDescriptor);
 
-		final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-		for (int i = 0; i < comparatorFactories.length; ++i) {
-			comparators[i] = comparatorFactories[i].createBinaryComparator();
-		}
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
 
-        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(
-                keyFields, storedKeys, comparators);
+        final FrameTuplePairComparator ftpcPartial = new FrameTuplePairComparator(keyFields, storedKeys, comparators);
 
-        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(
-                storedKeys, storedKeys, comparators);
+        final FrameTuplePairComparator ftpcTuple = new FrameTuplePairComparator(storedKeys, storedKeys, comparators);
 
-		final ITuplePartitionComputer tpc = tpcf.createPartitioner();
+        final ITuplePartitionComputer tpc = tpcf.createPartitioner();
 
-		final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null
-				: firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        final INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+                .createNormalizedKeyComputer();
 
-		int[] keyFieldsInPartialResults = new int[keyFields.length];
-		for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-			keyFieldsInPartialResults[i] = i;
-		}
+        int[] keyFieldsInPartialResults = new int[keyFields.length];
+        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+            keyFieldsInPartialResults[i] = i;
+        }
 
-		final IAggregatorDescriptor aggregator = aggregateFactory
-				.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
-						keyFields, keyFieldsInPartialResults);
+        final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
+                outRecordDescriptor, keyFields, keyFieldsInPartialResults);
 
-		final AggregateState aggregateState = aggregator
-				.createAggregateStates();
+        final AggregateState aggregateState = aggregator.createAggregateStates();
 
         final ArrayTupleBuilder stateTupleBuilder;
         if (keyFields.length < outRecordDescriptor.getFields().length) {
-            stateTupleBuilder = new ArrayTupleBuilder(
-                    outRecordDescriptor.getFields().length);
+            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
         } else {
-            stateTupleBuilder = new ArrayTupleBuilder(
-                    outRecordDescriptor.getFields().length + 1);
+            stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length + 1);
         }
 
-        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(
-                outRecordDescriptor.getFields().length);
+        final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         return new ISpillableTable() {
 
             private int lastBufIndex;
 
-			private ByteBuffer outputFrame;
+            private ByteBuffer outputFrame;
             private FrameTupleAppender outputAppender;
 
-            private FrameTupleAppender stateAppender = new FrameTupleAppender(
-                    ctx.getFrameSize());
+            private FrameTupleAppender stateAppender = new FrameTupleAppender(ctx.getFrameSize());
 
-            private final ISerializableTable table = new SerializableHashTable(
-                    tableSize, ctx);
+            private final ISerializableTable table = new SerializableHashTable(tableSize, ctx);
             private final TuplePointer storedTuplePointer = new TuplePointer();
             private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
 
-			/**
-			 * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
-			 * = Frame index in the "Frames" list, [1] = Tuple index in the
-			 * frame, [2] = Poor man's normalized key for the tuple.
-			 */
-			private int[] tPointers;
+            /**
+             * A tuple is "pointed" to by 3 entries in the tPointers array. [0]
+             * = Frame index in the "Frames" list, [1] = Tuple index in the
+             * frame, [2] = Poor man's normalized key for the tuple.
+             */
+            private int[] tPointers;
 
-			@Override
-			public void sortFrames() {
-				int sfIdx = storedKeys[0];
-				int totalTCount = table.getTupleCount();
-				tPointers = new int[totalTCount * 3];
-				int ptr = 0;
+            @Override
+            public void sortFrames() {
+                int sfIdx = storedKeys[0];
+                int totalTCount = table.getTupleCount();
+                tPointers = new int[totalTCount * 3];
+                int ptr = 0;
 
-				for (int i = 0; i < tableSize; i++) {
-					int entry = i;
-					int offset = 0;
-					do {
-						table.getTuplePointer(entry, offset, storedTuplePointer);
-						if (storedTuplePointer.frameIndex < 0)
-							break;
-						tPointers[ptr * 3] = entry;
-						tPointers[ptr * 3 + 1] = offset;
-						table.getTuplePointer(entry, offset, storedTuplePointer);
-						int fIndex = storedTuplePointer.frameIndex;
-						int tIndex = storedTuplePointer.tupleIndex;
-						storedKeysAccessor1.reset(frames.get(fIndex));
-						int tStart = storedKeysAccessor1
-								.getTupleStartOffset(tIndex);
-						int f0StartRel = storedKeysAccessor1
-								.getFieldStartOffset(tIndex, sfIdx);
-						int f0EndRel = storedKeysAccessor1.getFieldEndOffset(
-								tIndex, sfIdx);
-						int f0Start = f0StartRel + tStart
-								+ storedKeysAccessor1.getFieldSlotsLength();
-						tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc
-								.normalize(storedKeysAccessor1.getBuffer()
-										.array(), f0Start, f0EndRel
-										- f0StartRel);
-						ptr++;
-						offset++;
-					} while (true);
-				}
-				/**
-				 * Sort using quick sort
-				 */
-				if (tPointers.length > 0) {
-					sort(tPointers, 0, totalTCount);
-				}
-			}
+                for (int i = 0; i < tableSize; i++) {
+                    int entry = i;
+                    int offset = 0;
+                    do {
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        if (storedTuplePointer.frameIndex < 0)
+                            break;
+                        tPointers[ptr * 3] = entry;
+                        tPointers[ptr * 3 + 1] = offset;
+                        table.getTuplePointer(entry, offset, storedTuplePointer);
+                        int fIndex = storedTuplePointer.frameIndex;
+                        int tIndex = storedTuplePointer.tupleIndex;
+                        storedKeysAccessor1.reset(frames.get(fIndex));
+                        int tStart = storedKeysAccessor1.getTupleStartOffset(tIndex);
+                        int f0StartRel = storedKeysAccessor1.getFieldStartOffset(tIndex, sfIdx);
+                        int f0EndRel = storedKeysAccessor1.getFieldEndOffset(tIndex, sfIdx);
+                        int f0Start = f0StartRel + tStart + storedKeysAccessor1.getFieldSlotsLength();
+                        tPointers[ptr * 3 + 2] = nkc == null ? 0 : nkc.normalize(storedKeysAccessor1.getBuffer()
+                                .array(), f0Start, f0EndRel - f0StartRel);
+                        ptr++;
+                        offset++;
+                    } while (true);
+                }
+                /**
+                 * Sort using quick sort
+                 */
+                if (tPointers.length > 0) {
+                    sort(tPointers, 0, totalTCount);
+                }
+            }
 
-			@Override
-			public void reset() {
-				lastBufIndex = -1;
-				tPointers = null;
-				table.reset();
-				aggregator.reset();
-			}
+            @Override
+            public void reset() {
+                lastBufIndex = -1;
+                tPointers = null;
+                table.reset();
+                aggregator.reset();
+            }
 
-			@Override
-			public boolean insert(FrameTupleAccessor accessor, int tIndex)
-					throws HyracksDataException {
-				if (lastBufIndex < 0)
-					nextAvailableFrame();
-				int entry = tpc.partition(accessor, tIndex, tableSize);
-				boolean foundGroup = false;
-				int offset = 0;
-				do {
-					table.getTuplePointer(entry, offset++, storedTuplePointer);
-					if (storedTuplePointer.frameIndex < 0)
-						break;
-					storedKeysAccessor1.reset(frames
-							.get(storedTuplePointer.frameIndex));
-					int c = ftpcPartial.compare(accessor, tIndex,
-							storedKeysAccessor1, storedTuplePointer.tupleIndex);
-					if (c == 0) {
-						foundGroup = true;
-						break;
-					}
-				} while (true);
+            @Override
+            public boolean insert(FrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                if (lastBufIndex < 0)
+                    nextAvailableFrame();
+                int entry = tpc.partition(accessor, tIndex, tableSize);
+                boolean foundGroup = false;
+                int offset = 0;
+                do {
+                    table.getTuplePointer(entry, offset++, storedTuplePointer);
+                    if (storedTuplePointer.frameIndex < 0)
+                        break;
+                    storedKeysAccessor1.reset(frames.get(storedTuplePointer.frameIndex));
+                    int c = ftpcPartial.compare(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex);
+                    if (c == 0) {
+                        foundGroup = true;
+                        break;
+                    }
+                } while (true);
 
-				if (!foundGroup) {
+                if (!foundGroup) {
 
                     stateTupleBuilder.reset();
 
-                    aggregator.init(stateTupleBuilder, accessor, tIndex,
-                            aggregateState);
-                    if (!stateAppender.appendSkipEmptyField(
-                            stateTupleBuilder.getFieldEndOffsets(),
-                            stateTupleBuilder.getByteArray(), 0,
-                            stateTupleBuilder.getSize())) {
+                    aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
+                    if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                            stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
                         if (!nextAvailableFrame()) {
                             return false;
                         }
-                        if (!stateAppender.appendSkipEmptyField(
-                                stateTupleBuilder.getFieldEndOffsets(),
-                                stateTupleBuilder.getByteArray(), 0,
-                                stateTupleBuilder.getSize())) {
-                            throw new HyracksDataException(
-                                    "Cannot init external aggregate state in a frame.");
+                        if (!stateAppender.appendSkipEmptyField(stateTupleBuilder.getFieldEndOffsets(),
+                                stateTupleBuilder.getByteArray(), 0, stateTupleBuilder.getSize())) {
+                            throw new HyracksDataException("Cannot init external aggregate state in a frame.");
                         }
                     }
 
                     storedTuplePointer.frameIndex = lastBufIndex;
-                    storedTuplePointer.tupleIndex = stateAppender
-                            .getTupleCount() - 1;
+                    storedTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;
                     table.insert(entry, storedTuplePointer);
                 } else {
 
-					aggregator.aggregate(accessor, tIndex, storedKeysAccessor1,
-							storedTuplePointer.tupleIndex, aggregateState);
+                    aggregator.aggregate(accessor, tIndex, storedKeysAccessor1, storedTuplePointer.tupleIndex,
+                            aggregateState);
 
-				}
-				return true;
-			}
+                }
+                return true;
+            }
 
-			@Override
-			public List<ByteBuffer> getFrames() {
-				return frames;
-			}
+            @Override
+            public List<ByteBuffer> getFrames() {
+                return frames;
+            }
 
-			@Override
-			public int getFrameCount() {
-				return lastBufIndex;
-			}
+            @Override
+            public int getFrameCount() {
+                return lastBufIndex;
+            }
 
-			@Override
-			public void flushFrames(IFrameWriter writer, boolean isPartial)
-					throws HyracksDataException {
-				if (outputFrame == null) {
-					outputFrame = ctx.allocateFrame();
-				}
+            @Override
+            public void flushFrames(IFrameWriter writer, boolean isPartial) throws HyracksDataException {
+                if (outputFrame == null) {
+                    outputFrame = ctx.allocateFrame();
+                }
 
                 if (outputAppender == null) {
-                    outputAppender = new FrameTupleAppender(
-                            outputFrame.capacity());
+                    outputAppender = new FrameTupleAppender(outputFrame.capacity());
                 }
 
                 outputAppender.reset(outputFrame, true);
 
                 writer.open();
 
-				if (tPointers == null) {
-					// Not sorted
-					for (int i = 0; i < tableSize; ++i) {
-						int entry = i;
-						int offset = 0;
-						do {
-							table.getTuplePointer(entry, offset++,
-									storedTuplePointer);
-							if (storedTuplePointer.frameIndex < 0)
-								break;
-							int bIndex = storedTuplePointer.frameIndex;
-							int tIndex = storedTuplePointer.tupleIndex;
+                if (tPointers == null) {
+                    // Not sorted
+                    for (int i = 0; i < tableSize; ++i) {
+                        int entry = i;
+                        int offset = 0;
+                        do {
+                            table.getTuplePointer(entry, offset++, storedTuplePointer);
+                            if (storedTuplePointer.frameIndex < 0)
+                                break;
+                            int bIndex = storedTuplePointer.frameIndex;
+                            int tIndex = storedTuplePointer.tupleIndex;
 
-							storedKeysAccessor1.reset(frames.get(bIndex));
+                            storedKeysAccessor1.reset(frames.get(bIndex));
 
                             outputTupleBuilder.reset();
 
-							if (isPartial) {
+                            if (isPartial) {
 
-                                aggregator.outputPartialResult(
-                                        outputTupleBuilder,
-                                        storedKeysAccessor1, tIndex,
+                                aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
                                         aggregateState);
 
                             } else {
 
-                                aggregator.outputFinalResult(
-                                        outputTupleBuilder,
-                                        storedKeysAccessor1, tIndex,
+                                aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tIndex,
                                         aggregateState);
                             }
 
-                            if (!outputAppender.appendSkipEmptyField(
-                                    outputTupleBuilder.getFieldEndOffsets(),
-                                    outputTupleBuilder.getByteArray(), 0,
-                                    outputTupleBuilder.getSize())) {
+                            if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                                    outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                 FrameUtils.flushFrame(outputFrame, writer);
                                 outputAppender.reset(outputFrame, true);
-                                if (!outputAppender
-                                        .appendSkipEmptyField(
-                                                outputTupleBuilder
-                                                        .getFieldEndOffsets(),
-                                                outputTupleBuilder
-                                                        .getByteArray(), 0,
-                                                outputTupleBuilder.getSize())) {
+                                if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                                        outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                     throw new HyracksDataException(
                                             "The output item is too large to be fit into a frame.");
                                 }
@@ -368,8 +321,7 @@
                 for (int ptr = 0; ptr < n; ptr++) {
                     int tableIndex = tPointers[ptr * 3];
                     int rowIndex = tPointers[ptr * 3 + 1];
-                    table.getTuplePointer(tableIndex, rowIndex,
-                            storedTuplePointer);
+                    table.getTuplePointer(tableIndex, rowIndex, storedTuplePointer);
                     int frameIndex = storedTuplePointer.frameIndex;
                     int tupleIndex = storedTuplePointer.tupleIndex;
                     // Get the frame containing the value
@@ -378,33 +330,24 @@
 
                     outputTupleBuilder.reset();
 
-					if (isPartial) {
+                    if (isPartial) {
 
-                        aggregator
-                                .outputPartialResult(outputTupleBuilder,
-                                        storedKeysAccessor1, tupleIndex,
-                                        aggregateState);
+                        aggregator.outputPartialResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
+                                aggregateState);
 
                     } else {
 
-                        aggregator
-                                .outputFinalResult(outputTupleBuilder,
-                                        storedKeysAccessor1, tupleIndex,
-                                        aggregateState);
+                        aggregator.outputFinalResult(outputTupleBuilder, storedKeysAccessor1, tupleIndex,
+                                aggregateState);
                     }
 
-                    if (!outputAppender.appendSkipEmptyField(
-                            outputTupleBuilder.getFieldEndOffsets(),
-                            outputTupleBuilder.getByteArray(), 0,
-                            outputTupleBuilder.getSize())) {
+                    if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                            outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                         FrameUtils.flushFrame(outputFrame, writer);
                         outputAppender.reset(outputFrame, true);
-                        if (!outputAppender.appendSkipEmptyField(
-                                outputTupleBuilder.getFieldEndOffsets(),
-                                outputTupleBuilder.getByteArray(), 0,
-                                outputTupleBuilder.getSize())) {
-                            throw new HyracksDataException(
-                                    "The output item is too large to be fit into a frame.");
+                        if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+                                outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+                            throw new HyracksDataException("The output item is too large to be fit into a frame.");
                         }
                     }
                 }
@@ -415,27 +358,27 @@
                 aggregator.close();
             }
 
-			@Override
-			public void close() {
-				lastBufIndex = -1;
-				tPointers = null;
-				table.close();
-				frames.clear();
-				aggregateState.close();
-			}
+            @Override
+            public void close() {
+                lastBufIndex = -1;
+                tPointers = null;
+                table.close();
+                frames.clear();
+                aggregateState.close();
+            }
 
-			/**
-			 * Set the working frame to the next available frame in the frame
-			 * list. There are two cases:<br>
-			 * 1) If the next frame is not initialized, allocate a new frame. 2)
-			 * When frames are already created, they are recycled.
-			 * 
-			 * @return Whether a new frame is added successfully.
-			 */
-			private boolean nextAvailableFrame() {
-				// Return false if the number of frames is equal to the limit.
-				if (lastBufIndex + 1 >= framesLimit)
-					return false;
+            /**
+             * Set the working frame to the next available frame in the frame
+             * list. There are two cases:<br>
+             * 1) If the next frame is not initialized, allocate a new frame. 2)
+             * When frames are already created, they are recycled.
+             * 
+             * @return Whether a new frame is added successfully.
+             */
+            private boolean nextAvailableFrame() {
+                // Return false if the number of frames is equal to the limit.
+                if (lastBufIndex + 1 >= framesLimit)
+                    return false;
 
                 if (frames.size() < framesLimit) {
                     // Insert a new frame
@@ -456,107 +399,101 @@
                 return true;
             }
 
-			private void sort(int[] tPointers, int offset, int length) {
-				int m = offset + (length >> 1);
-				int mTable = tPointers[m * 3];
-				int mRow = tPointers[m * 3 + 1];
-				int mNormKey = tPointers[m * 3 + 2];
+            private void sort(int[] tPointers, int offset, int length) {
+                int m = offset + (length >> 1);
+                int mTable = tPointers[m * 3];
+                int mRow = tPointers[m * 3 + 1];
+                int mNormKey = tPointers[m * 3 + 2];
 
-				table.getTuplePointer(mTable, mRow, storedTuplePointer);
-				int mFrame = storedTuplePointer.frameIndex;
-				int mTuple = storedTuplePointer.tupleIndex;
-				storedKeysAccessor1.reset(frames.get(mFrame));
+                table.getTuplePointer(mTable, mRow, storedTuplePointer);
+                int mFrame = storedTuplePointer.frameIndex;
+                int mTuple = storedTuplePointer.tupleIndex;
+                storedKeysAccessor1.reset(frames.get(mFrame));
 
-				int a = offset;
-				int b = a;
-				int c = offset + length - 1;
-				int d = c;
-				while (true) {
-					while (b <= c) {
-						int bTable = tPointers[b * 3];
-						int bRow = tPointers[b * 3 + 1];
-						int bNormKey = tPointers[b * 3 + 2];
-						int cmp = 0;
-						if (bNormKey != mNormKey) {
-							cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
-									: 1;
-						} else {
-							table.getTuplePointer(bTable, bRow,
-									storedTuplePointer);
-							int bFrame = storedTuplePointer.frameIndex;
-							int bTuple = storedTuplePointer.tupleIndex;
-							storedKeysAccessor2.reset(frames.get(bFrame));
-							cmp = ftpcTuple.compare(storedKeysAccessor2,
-									bTuple, storedKeysAccessor1, mTuple);
-						}
-						if (cmp > 0) {
-							break;
-						}
-						if (cmp == 0) {
-							swap(tPointers, a++, b);
-						}
-						++b;
-					}
-					while (c >= b) {
-						int cTable = tPointers[c * 3];
-						int cRow = tPointers[c * 3 + 1];
-						int cNormKey = tPointers[c * 3 + 2];
-						int cmp = 0;
-						if (cNormKey != mNormKey) {
-							cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
-									: 1;
-						} else {
-							table.getTuplePointer(cTable, cRow,
-									storedTuplePointer);
-							int cFrame = storedTuplePointer.frameIndex;
-							int cTuple = storedTuplePointer.tupleIndex;
-							storedKeysAccessor2.reset(frames.get(cFrame));
-							cmp = ftpcTuple.compare(storedKeysAccessor2,
-									cTuple, storedKeysAccessor1, mTuple);
-						}
-						if (cmp < 0) {
-							break;
-						}
-						if (cmp == 0) {
-							swap(tPointers, c, d--);
-						}
-						--c;
-					}
-					if (b > c)
-						break;
-					swap(tPointers, b++, c--);
-				}
+                int a = offset;
+                int b = a;
+                int c = offset + length - 1;
+                int d = c;
+                while (true) {
+                    while (b <= c) {
+                        int bTable = tPointers[b * 3];
+                        int bRow = tPointers[b * 3 + 1];
+                        int bNormKey = tPointers[b * 3 + 2];
+                        int cmp = 0;
+                        if (bNormKey != mNormKey) {
+                            cmp = ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                        } else {
+                            table.getTuplePointer(bTable, bRow, storedTuplePointer);
+                            int bFrame = storedTuplePointer.frameIndex;
+                            int bTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(bFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2, bTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp > 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, a++, b);
+                        }
+                        ++b;
+                    }
+                    while (c >= b) {
+                        int cTable = tPointers[c * 3];
+                        int cRow = tPointers[c * 3 + 1];
+                        int cNormKey = tPointers[c * 3 + 2];
+                        int cmp = 0;
+                        if (cNormKey != mNormKey) {
+                            cmp = ((((long) cNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1 : 1;
+                        } else {
+                            table.getTuplePointer(cTable, cRow, storedTuplePointer);
+                            int cFrame = storedTuplePointer.frameIndex;
+                            int cTuple = storedTuplePointer.tupleIndex;
+                            storedKeysAccessor2.reset(frames.get(cFrame));
+                            cmp = ftpcTuple.compare(storedKeysAccessor2, cTuple, storedKeysAccessor1, mTuple);
+                        }
+                        if (cmp < 0) {
+                            break;
+                        }
+                        if (cmp == 0) {
+                            swap(tPointers, c, d--);
+                        }
+                        --c;
+                    }
+                    if (b > c)
+                        break;
+                    swap(tPointers, b++, c--);
+                }
 
-				int s;
-				int n = offset + length;
-				s = Math.min(a - offset, b - a);
-				vecswap(tPointers, offset, b - s, s);
-				s = Math.min(d - c, n - d - 1);
-				vecswap(tPointers, b, n - s, s);
+                int s;
+                int n = offset + length;
+                s = Math.min(a - offset, b - a);
+                vecswap(tPointers, offset, b - s, s);
+                s = Math.min(d - c, n - d - 1);
+                vecswap(tPointers, b, n - s, s);
 
-				if ((s = b - a) > 1) {
-					sort(tPointers, offset, s);
-				}
-				if ((s = d - c) > 1) {
-					sort(tPointers, n - s, s);
-				}
-			}
+                if ((s = b - a) > 1) {
+                    sort(tPointers, offset, s);
+                }
+                if ((s = d - c) > 1) {
+                    sort(tPointers, n - s, s);
+                }
+            }
 
-			private void swap(int x[], int a, int b) {
-				for (int i = 0; i < 3; ++i) {
-					int t = x[a * 3 + i];
-					x[a * 3 + i] = x[b * 3 + i];
-					x[b * 3 + i] = t;
-				}
-			}
+            private void swap(int x[], int a, int b) {
+                for (int i = 0; i < 3; ++i) {
+                    int t = x[a * 3 + i];
+                    x[a * 3 + i] = x[b * 3 + i];
+                    x[b * 3 + i] = t;
+                }
+            }
 
-			private void vecswap(int x[], int a, int b, int n) {
-				for (int i = 0; i < n; i++, a++, b++) {
-					swap(x, a, b);
-				}
-			}
+            private void vecswap(int x[], int a, int b, int n) {
+                for (int i = 0; i < n; i++, a++, b++) {
+                    swap(x, a, b);
+                }
+            }
 
-		};
-	}
+        };
+    }
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index 329eb4b..2cf978c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -42,8 +42,8 @@
      *            The state to be initialized.
      * @throws HyracksDataException
      */
-    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
-            int tIndex, AggregateState state) throws HyracksDataException;
+    public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+            throws HyracksDataException;
 
     /**
      * Reset the aggregator. The corresponding aggregate state should be reset
@@ -69,9 +69,8 @@
      *            The aggregate state.
      * @throws HyracksDataException
      */
-    public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-            IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-            AggregateState state) throws HyracksDataException;
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+            int stateTupleIndex, AggregateState state) throws HyracksDataException;
 
     /**
      * Output the partial aggregation result.
@@ -85,9 +84,8 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException;
+    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException;
 
     /**
      * Output the final aggregation result.
@@ -101,9 +99,8 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-            IFrameTupleAccessor accessor, int tIndex, AggregateState state)
-            throws HyracksDataException;
+    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException;
 
     public void close();
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 339c29f..9ff915d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -25,8 +25,8 @@
  */
 public interface IAggregatorDescriptorFactory extends Serializable {
 
-    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) throws HyracksDataException;
-    
+    IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+            throws HyracksDataException;
+
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
index 32f4365..a34fa73 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptor.java
@@ -41,8 +41,7 @@
      *            The state to be initialized.
      * @throws HyracksDataException
      */
-    public void init(IFrameTupleAccessor accessor, int tIndex,
-            DataOutput fieldOutput, AggregateState state)
+    public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
             throws HyracksDataException;
 
     /**
@@ -74,8 +73,7 @@
      *            The aggregate state.
      * @throws HyracksDataException
      */
-    public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-            byte[] data, int offset, AggregateState state)
+    public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, AggregateState state)
             throws HyracksDataException;
 
     /**
@@ -90,8 +88,8 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-            int offset, AggregateState state) throws HyracksDataException;
+    public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+            throws HyracksDataException;
 
     /**
      * Output the final aggregation result.
@@ -105,8 +103,8 @@
      *            The aggregation state.
      * @throws HyracksDataException
      */
-    public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-            int offset, AggregateState state) throws HyracksDataException;
+    public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+            throws HyracksDataException;
 
     public boolean needsBinaryState();
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
index ee35c5e..6f50597 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IFieldAggregateDescriptorFactory.java
@@ -25,8 +25,7 @@
  */
 public interface IFieldAggregateDescriptorFactory extends Serializable {
 
-	public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-			RecordDescriptor inRecordDescriptor,
-			RecordDescriptor outRecordDescriptor) throws HyracksDataException;
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException;
 
 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 17ad5f0..c718ee4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -29,17 +29,15 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
-public class PreclusteredGroupOperatorDescriptor extends
-        AbstractSingleActivityOperatorDescriptor {
+public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private final int[] groupFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final IAggregatorDescriptorFactory aggregatorFactory;
 
     private static final long serialVersionUID = 1L;
 
-    public PreclusteredGroupOperatorDescriptor(JobSpecification spec,
-            int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory,
+    public PreclusteredGroupOperatorDescriptor(JobSpecification spec, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.groupFields = groupFields;
@@ -49,40 +47,33 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(
-            final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition,
-            int nPartitions) throws HyracksDataException {
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        final RecordDescriptor inRecordDesc = recordDescProvider
-                .getInputRecordDescriptor(getOperatorId(), 0);
-        final IAggregatorDescriptor aggregator = aggregatorFactory
-                .createAggregator(ctx, inRecordDesc, recordDescriptors[0],
-                        groupFields, groupFields);
+        final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+        final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                recordDescriptors[0], groupFields, groupFields);
         final ByteBuffer copyFrame = ctx.allocateFrame();
-        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(
-                ctx.getFrameSize(), inRecordDesc);
+        final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
         copyFrameAccessor.reset(copyFrame);
         ByteBuffer outFrame = ctx.allocateFrame();
-        final FrameTupleAppender appender = new FrameTupleAppender(
-                ctx.getFrameSize());
+        final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(outFrame, true);
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             private PreclusteredGroupWriter pgw;
 
             @Override
             public void open() throws HyracksDataException {
-                pgw = new PreclusteredGroupWriter(ctx, groupFields,
-                        comparators, aggregator, inRecordDesc, recordDescriptors[0], writer);
+                pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+                        recordDescriptors[0], writer);
                 pgw.open();
             }
 
             @Override
-            public void nextFrame(ByteBuffer buffer)
-                    throws HyracksDataException {
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 pgw.nextFrame(buffer);
             }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index ee8489a..7e4baac 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -27,157 +27,138 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PreclusteredGroupWriter implements IFrameWriter {
-	private final int[] groupFields;
-	private final IBinaryComparator[] comparators;
-	private final IAggregatorDescriptor aggregator;
-	private final AggregateState aggregateState;
-	private final IFrameWriter writer;
-	private final ByteBuffer copyFrame;
-	private final FrameTupleAccessor inFrameAccessor;
-	private final FrameTupleAccessor copyFrameAccessor;
+    private final int[] groupFields;
+    private final IBinaryComparator[] comparators;
+    private final IAggregatorDescriptor aggregator;
+    private final AggregateState aggregateState;
+    private final IFrameWriter writer;
+    private final ByteBuffer copyFrame;
+    private final FrameTupleAccessor inFrameAccessor;
+    private final FrameTupleAccessor copyFrameAccessor;
 
-	private final ByteBuffer outFrame;
-	private final FrameTupleAppender appender;
-	private final ArrayTupleBuilder tupleBuilder;
+    private final ByteBuffer outFrame;
+    private final FrameTupleAppender appender;
+    private final ArrayTupleBuilder tupleBuilder;
 
-	private boolean first;
+    private boolean first;
 
-	public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
-			IBinaryComparator[] comparators, IAggregatorDescriptor aggregator,
-			RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
-			IFrameWriter writer) {
-		this.groupFields = groupFields;
-		this.comparators = comparators;
-		this.aggregator = aggregator;
-		this.aggregateState = aggregator.createAggregateStates();
-		this.writer = writer;
-		copyFrame = ctx.allocateFrame();
-		inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				inRecordDesc);
-		copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-				inRecordDesc);
-		copyFrameAccessor.reset(copyFrame);
+    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
+            IFrameWriter writer) {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.aggregator = aggregator;
+        this.aggregateState = aggregator.createAggregateStates();
+        this.writer = writer;
+        copyFrame = ctx.allocateFrame();
+        inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
 
-		outFrame = ctx.allocateFrame();
-		appender = new FrameTupleAppender(ctx.getFrameSize());
-		appender.reset(outFrame, true);
+        outFrame = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(outFrame, true);
 
-		tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
-	}
+        tupleBuilder = new ArrayTupleBuilder(outRecordDesc.getFields().length);
+    }
 
-	@Override
-	public void open() throws HyracksDataException {
-		writer.open();
-		first = true;
-	}
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        first = true;
+    }
 
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-		inFrameAccessor.reset(buffer);
-		int nTuples = inFrameAccessor.getTupleCount();
-		for (int i = 0; i < nTuples; ++i) {
-			if (first) {
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int nTuples = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
 
-				tupleBuilder.reset();
-				for (int j = 0; j < groupFields.length; j++) {
-					tupleBuilder.addField(inFrameAccessor, i, j);
-				}
-				aggregator.init(tupleBuilder, inFrameAccessor, i,
-						aggregateState);
+                tupleBuilder.reset();
+                for (int j = 0; j < groupFields.length; j++) {
+                    tupleBuilder.addField(inFrameAccessor, i, j);
+                }
+                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
 
-				first = false;
+                first = false;
 
-			} else {
-				if (i == 0) {
-					switchGroupIfRequired(copyFrameAccessor,
-							copyFrameAccessor.getTupleCount() - 1,
-							inFrameAccessor, i);
-				} else {
-					switchGroupIfRequired(inFrameAccessor, i - 1,
-							inFrameAccessor, i);
-				}
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                } else {
+                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                }
 
-			}
-		}
-		FrameUtils.copy(buffer, copyFrame);
-	}
+            }
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
 
-	private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor,
-			int prevTupleIndex, FrameTupleAccessor currTupleAccessor,
-			int currTupleIndex) throws HyracksDataException {
-		if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor,
-				currTupleIndex)) {
-			writeOutput(prevTupleAccessor, prevTupleIndex);
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            writeOutput(prevTupleAccessor, prevTupleIndex);
 
-			tupleBuilder.reset();
-			for (int j = 0; j < groupFields.length; j++) {
-				tupleBuilder.addField(currTupleAccessor, currTupleIndex, j);
-			}
-			aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex,
-					aggregateState);
-		} else {
-			aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0,
-					aggregateState);
-		}
-	}
+            tupleBuilder.reset();
+            for (int j = 0; j < groupFields.length; j++) {
+                tupleBuilder.addField(currTupleAccessor, currTupleIndex, j);
+            }
+            aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+        } else {
+            aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
+        }
+    }
 
-	private void writeOutput(final FrameTupleAccessor lastTupleAccessor,
-			int lastTupleIndex) throws HyracksDataException {
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+            throws HyracksDataException {
 
-		tupleBuilder.reset();
-		for (int j = 0; j < groupFields.length; j++) {
-			tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
-		}
-		aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
-				lastTupleIndex, aggregateState);
+        tupleBuilder.reset();
+        for (int j = 0; j < groupFields.length; j++) {
+            tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
+        }
+        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
-		if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-				tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-			FrameUtils.flushFrame(outFrame, writer);
-			appender.reset(outFrame, true);
-			if (!appender.appendSkipEmptyField(
-					tupleBuilder.getFieldEndOffsets(),
-					tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-				throw new HyracksDataException(
-						"The output cannot be fit into a frame.");
-			}
-		}
+        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize())) {
+            FrameUtils.flushFrame(outFrame, writer);
+            appender.reset(outFrame, true);
+            if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
 
-	}
+    }
 
-	private boolean sameGroup(FrameTupleAccessor a1, int t1Idx,
-			FrameTupleAccessor a2, int t2Idx) {
-		for (int i = 0; i < comparators.length; ++i) {
-			int fIdx = groupFields[i];
-			int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
-					+ a1.getFieldStartOffset(t1Idx, fIdx);
-			int l1 = a1.getFieldLength(t1Idx, fIdx);
-			int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
-					+ a2.getFieldStartOffset(t2Idx, fIdx);
-			int l2 = a2.getFieldLength(t2Idx, fIdx);
-			if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2
-					.getBuffer().array(), s2, l2) != 0) {
-				return false;
-			}
-		}
-		return true;
-	}
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
 
-	@Override
-	public void fail() throws HyracksDataException {
-		writer.fail();
-	}
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
 
-	@Override
-	public void close() throws HyracksDataException {
-		if (!first) {
-			writeOutput(copyFrameAccessor,
-					copyFrameAccessor.getTupleCount() - 1);
-			if (appender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(outFrame, writer);
-			}
-		}
-		aggregateState.close();
-		writer.close();
-	}
+    @Override
+    public void close() throws HyracksDataException {
+        if (!first) {
+            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outFrame, writer);
+            }
+        }
+        aggregateState.close();
+        writer.close();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
index 66eed9d..2e781b5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldGroupAggregatorFactory.java
@@ -31,41 +31,46 @@
  *
  */
 public class AvgFieldGroupAggregatorFactory implements IFieldAggregateDescriptorFactory {
-    
+
     private static final long serialVersionUID = 1L;
-    
+
     private final int aggField;
-    
+
     private final boolean useObjectState;
-    
-    public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState){
+
+    public AvgFieldGroupAggregatorFactory(int aggField, boolean useObjectState) {
         this.aggField = aggField;
         this.useObjectState = useObjectState;
     }
-    
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+     * IFieldAggregateDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        
+
         return new IFieldAggregateDescriptor() {
-            
+
             @Override
             public void reset() {
             }
-            
+
             @Override
-            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 int sum, count;
                 if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum = fields[0];
                     count = fields[1];
                 }
@@ -73,74 +78,65 @@
                     fieldOutput.writeInt(sum);
                     fieldOutput.writeInt(count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 int sum, count;
                 if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum = fields[0];
                     count = fields[1];
                 }
                 try {
-                    fieldOutput.writeFloat((float)sum/count);
+                    fieldOutput.writeFloat((float) sum / count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
                 count += 1;
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
                         fieldOutput.writeInt(count);
                     } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
+                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
                     }
                 } else {
-                    state.state = new Integer[]{sum, count};
+                    state.state = new Integer[] { sum, count };
                 }
             }
-            
+
             @Override
             public void close() {
                 // TODO Auto-generated method stub
-                
+
             }
-            
+
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 int sum = 0, count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
                 count += 1;
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
@@ -149,10 +145,10 @@
                     buf.putInt(offset, sum);
                     buf.putInt(offset + 4, count);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum += fields[0];
                     count += fields[1];
-                    state.state = new Integer[]{sum, count};
+                    state.state = new Integer[] { sum, count };
                 }
             }
 
@@ -160,15 +156,15 @@
             public boolean needsObjectState() {
                 return useObjectState;
             }
-            
+
             @Override
             public boolean needsBinaryState() {
                 return !useObjectState;
             }
-            
+
             @Override
             public AggregateState createState() {
-                return new AggregateState(new Integer[]{0, 0});
+                return new AggregateState(new Integer[] { 0, 0 });
             }
 
         };
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
index cc30641..cc5c1e1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/AvgFieldMergeAggregatorFactory.java
@@ -30,43 +30,47 @@
 /**
  *
  */
-public class AvgFieldMergeAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
+public class AvgFieldMergeAggregatorFactory implements IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
-    
+
     private final int aggField;
-    
+
     private final boolean useObjectState;
-    
+
     public AvgFieldMergeAggregatorFactory(int aggField, boolean useObjectState) {
         this.aggField = aggField;
         this.useObjectState = useObjectState;
     }
-    
-    /* (non-Javadoc)
-     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see edu.uci.ics.hyracks.dataflow.std.aggregations.
+     * IFieldAggregateDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
-        
+
         return new IFieldAggregateDescriptor() {
-            
+
             @Override
             public void reset() {
             }
-            
+
             @Override
-            public void outputPartialResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 int sum, count;
                 if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum = fields[0];
                     count = fields[1];
                 }
@@ -74,48 +78,43 @@
                     fieldOutput.writeInt(sum);
                     fieldOutput.writeInt(count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state) throws HyracksDataException {
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
+                    throws HyracksDataException {
                 int sum, count;
                 if (!useObjectState) {
                     sum = IntegerSerializerDeserializer.getInt(data, offset);
                     count = IntegerSerializerDeserializer.getInt(data, offset + 4);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum = fields[0];
                     count = fields[1];
                 }
                 try {
-                    fieldOutput.writeFloat((float)sum/count);
+                    fieldOutput.writeFloat((float) sum / count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
-            
+
             @Override
             public void close() {
                 // TODO Auto-generated method stub
-                
+
             }
-            
+
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 int sum = 0, count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
                 count += 1;
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
@@ -124,10 +123,10 @@
                     buf.putInt(offset, sum);
                     buf.putInt(offset + 4, count);
                 } else {
-                    Integer[] fields = (Integer[])state.state;
+                    Integer[] fields = (Integer[]) state.state;
                     sum += fields[0];
                     count += fields[1];
-                    state.state = new Integer[]{sum, count};
+                    state.state = new Integer[] { sum, count };
                 }
             }
 
@@ -135,47 +134,40 @@
             public boolean needsObjectState() {
                 return useObjectState;
             }
-            
+
             @Override
             public boolean needsBinaryState() {
                 return !useObjectState;
             }
-            
+
             @Override
             public AggregateState createState() {
-                return new AggregateState(new Integer[]{0, 0});
+                return new AggregateState(new Integer[] { 0, 0 });
             }
-            
+
             @Override
-            public void init(IFrameTupleAccessor accessor,
-                    int tIndex, DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int sum = 0;
                 int count = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
-                count += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart + 4);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+                count += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4);
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
                         fieldOutput.writeInt(count);
                     } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
+                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
                     }
                 } else {
-                    state.state = new Integer[]{sum, count};
+                    state.state = new Integer[] { sum, count };
                 }
             }
         };
     }
 
 }
-
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
index e320603..9bfec8e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/CountFieldAggregatorFactory.java
@@ -30,8 +30,7 @@
 /**
  *
  */
-public class CountFieldAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
+public class CountFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -51,8 +50,7 @@
      * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         return new IFieldAggregateDescriptor() {
 
@@ -61,8 +59,7 @@
             }
 
             @Override
-            public void outputPartialResult(DataOutput fieldOutput,
-                    byte[] data, int offset, AggregateState state)
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int count;
                 if (!useObjectState) {
@@ -73,14 +70,12 @@
                 try {
                     fieldOutput.writeInt(count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
 
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state)
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int count;
                 if (!useObjectState) {
@@ -91,22 +86,19 @@
                 try {
                     fieldOutput.writeInt(count);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
 
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int count = 1;
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(count);
                     } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
+                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
                     }
                 } else {
                     state.state = count;
@@ -131,9 +123,8 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 int count = 1;
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
index a3c0aa9..7d85deb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/IntSumFieldAggregatorFactory.java
@@ -30,8 +30,7 @@
 /**
  *
  */
-public class IntSumFieldAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
+public class IntSumFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -54,8 +53,7 @@
      * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
 
         return new IFieldAggregateDescriptor() {
@@ -65,8 +63,7 @@
             }
 
             @Override
-            public void outputPartialResult(DataOutput fieldOutput,
-                    byte[] data, int offset, AggregateState state)
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
                 if (!useObjectState) {
@@ -77,14 +74,12 @@
                 try {
                     fieldOutput.writeInt(sum);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
 
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state)
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 int sum;
                 if (!useObjectState) {
@@ -95,31 +90,26 @@
                 try {
                     fieldOutput.writeInt(sum);
                 } catch (IOException e) {
-                    throw new HyracksDataException(
-                            "I/O exception when writing aggregation to the output buffer.");
+                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
                 }
             }
 
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
 
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
 
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
 
                 if (!useObjectState) {
                     try {
                         fieldOutput.writeInt(sum);
                     } catch (IOException e) {
-                        throw new HyracksDataException(
-                                "I/O exception when initializing the aggregator.");
+                        throw new HyracksDataException("I/O exception when initializing the aggregator.");
                     }
                 } else {
                     state.state = sum;
@@ -144,16 +134,13 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 int sum = 0;
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
-                sum += IntegerSerializerDeserializer.getInt(accessor
-                        .getBuffer().array(),
-                        tupleOffset + accessor.getFieldSlotsLength()
-                                + fieldStart);
+                sum += IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
 
                 if (!useObjectState) {
                     ByteBuffer buf = ByteBuffer.wrap(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
index 9c6ff68..94ebcbd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MinMaxStringFieldAggregatorFactory.java
@@ -33,8 +33,7 @@
 /**
  *
  */
-public class MinMaxStringFieldAggregatorFactory implements
-        IFieldAggregateDescriptorFactory {
+public class MinMaxStringFieldAggregatorFactory implements IFieldAggregateDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -44,8 +43,7 @@
 
     private final boolean hasBinaryState;
 
-    public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax,
-            boolean hasBinaryState) {
+    public MinMaxStringFieldAggregatorFactory(int aggField, boolean isMax, boolean hasBinaryState) {
         this.aggField = aggField;
         this.isMax = isMax;
         this.hasBinaryState = hasBinaryState;
@@ -61,8 +59,7 @@
      * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
      */
     @Override
-    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor) throws HyracksDataException {
         return new IFieldAggregateDescriptor() {
 
@@ -71,13 +68,11 @@
             }
 
             @Override
-            public void outputPartialResult(DataOutput fieldOutput,
-                    byte[] data, int offset, AggregateState state)
+            public void outputPartialResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
                     if (hasBinaryState) {
-                        int stateIdx = IntegerSerializerDeserializer.getInt(
-                                data, offset);
+                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
                         Object[] storedState = (Object[]) state.state;
                         fieldOutput.writeUTF((String) storedState[stateIdx]);
                     } else {
@@ -90,13 +85,11 @@
             }
 
             @Override
-            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
-                    int offset, AggregateState state)
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data, int offset, AggregateState state)
                     throws HyracksDataException {
                 try {
                     if (hasBinaryState) {
-                        int stateIdx = IntegerSerializerDeserializer.getInt(
-                                data, offset);
+                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
                         Object[] storedState = (Object[]) state.state;
                         fieldOutput.writeUTF((String) storedState[stateIdx]);
                     } else {
@@ -109,18 +102,14 @@
             }
 
             @Override
-            public void init(IFrameTupleAccessor accessor, int tIndex,
-                    DataOutput fieldOutput, AggregateState state)
+            public void init(IFrameTupleAccessor accessor, int tIndex, DataOutput fieldOutput, AggregateState state)
                     throws HyracksDataException {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                 int fieldLength = accessor.getFieldLength(tIndex, aggField);
-                String strField = UTF8StringSerializerDeserializer.INSTANCE
-                        .deserialize(new DataInputStream(
-                                new ByteArrayInputStream(accessor.getBuffer()
-                                        .array(), tupleOffset
-                                        + accessor.getFieldSlotsLength()
-                                        + fieldStart, fieldLength)));
+                String strField = UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
                 if (hasBinaryState) {
                     // Object-binary-state
                     Object[] storedState;
@@ -133,8 +122,7 @@
                     }
                     int stateCount = (Integer) (storedState[0]);
                     if (stateCount + 1 >= storedState.length) {
-                        storedState = Arrays.copyOf(storedState,
-                                storedState.length * 2);
+                        storedState = Arrays.copyOf(storedState, storedState.length * 2);
                         state.state = storedState;
                     }
 
@@ -159,44 +147,35 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    byte[] data, int offset, AggregateState state)
-                    throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset,
+                    AggregateState state) throws HyracksDataException {
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
                 int fieldLength = accessor.getFieldLength(tIndex, aggField);
-                String strField = UTF8StringSerializerDeserializer.INSTANCE
-                        .deserialize(new DataInputStream(
-                                new ByteArrayInputStream(accessor.getBuffer()
-                                        .array(), tupleOffset
-                                        + accessor.getFieldSlotsLength()
-                                        + fieldStart, fieldLength)));
+                String strField = UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+                        new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+                                + accessor.getFieldSlotsLength() + fieldStart, fieldLength)));
                 if (hasBinaryState) {
-                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
-                            offset);
+                    int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
 
                     Object[] storedState = (Object[]) state.state;
 
                     if (isMax) {
-                        if (strField.length() > ((String) (storedState[stateIdx]))
-                                .length()) {
+                        if (strField.length() > ((String) (storedState[stateIdx])).length()) {
                             storedState[stateIdx] = strField;
                         }
                     } else {
-                        if (strField.length() < ((String) (storedState[stateIdx]))
-                                .length()) {
+                        if (strField.length() < ((String) (storedState[stateIdx])).length()) {
                             storedState[stateIdx] = strField;
                         }
                     }
                 } else {
                     if (isMax) {
-                        if (strField.length() > ((String) (state.state))
-                                .length()) {
+                        if (strField.length() > ((String) (state.state)).length()) {
                             state.state = strField;
                         }
                     } else {
-                        if (strField.length() < ((String) (state.state))
-                                .length()) {
+                        if (strField.length() < ((String) (state.state)).length()) {
                             state.state = strField;
                         }
                     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 81eb45f..5c6e94d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -30,21 +30,18 @@
 /**
  *
  */
-public class MultiFieldsAggregatorFactory implements
-        IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
     private int[] keys;
 
-    public MultiFieldsAggregatorFactory(int[] keys, 
-            IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+    public MultiFieldsAggregatorFactory(int[] keys, IFieldAggregateDescriptorFactory[] aggregatorFactories) {
         this.keys = keys;
         this.aggregatorFactories = aggregatorFactories;
     }
-    
-    public MultiFieldsAggregatorFactory(
-            IFieldAggregateDescriptorFactory[] aggregatorFactories) {
+
+    public MultiFieldsAggregatorFactory(IFieldAggregateDescriptorFactory[] aggregatorFactories) {
         this.aggregatorFactories = aggregatorFactories;
     }
 
@@ -58,21 +55,18 @@
      * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
      */
     @Override
-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
-            RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields,
-            final int[] keyFieldsInPartialResults) throws HyracksDataException {
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
+            throws HyracksDataException {
 
         final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
         for (int i = 0; i < aggregators.length; i++) {
-            aggregators[i] = aggregatorFactories[i].createAggregator(ctx,
-                    inRecordDescriptor, outRecordDescriptor);
+            aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor);
         }
-        
-        if(this.keys == null){
+
+        if (this.keys == null) {
             this.keys = keyFields;
         }
-        
 
         return new IAggregatorDescriptor() {
 
@@ -84,41 +78,34 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
-                    IFrameTupleAccessor accessor, int tIndex,
+            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
 
                 tupleBuilder.reset();
                 for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    tupleBuilder.addField(accessor, tIndex,
-                            keyFieldsInPartialResults[i]);
+                    tupleBuilder.addField(accessor, tIndex, keyFieldsInPartialResults[i]);
                 }
                 DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
-                    int fieldOffset = accessor.getFieldStartOffset(tIndex,
-                            keys.length + i);
-                    aggregators[i].outputPartialResult(dos, accessor
-                            .getBuffer().array(),
-                            fieldOffset + accessor.getFieldSlotsLength()
-                                    + tupleOffset, ((AggregateState[]) state
-                                    .state)[i]);
+                    int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
+                    aggregators[i].outputPartialResult(dos, accessor.getBuffer().array(),
+                            fieldOffset + accessor.getFieldSlotsLength() + tupleOffset,
+                            ((AggregateState[]) state.state)[i]);
                     tupleBuilder.addFieldEndOffset();
                 }
 
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
-                    IFrameTupleAccessor accessor, int tIndex,
+            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
 
                 tupleBuilder.reset();
 
                 for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                    tupleBuilder.addField(accessor, tIndex,
-                            keyFieldsInPartialResults[i]);
+                    tupleBuilder.addField(accessor, tIndex, keyFieldsInPartialResults[i]);
                 }
 
                 DataOutput dos = tupleBuilder.getDataOutput();
@@ -126,26 +113,21 @@
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
                 for (int i = 0; i < aggregators.length; i++) {
                     if (aggregators[i].needsBinaryState()) {
-                        int fieldOffset = accessor.getFieldStartOffset(tIndex,
-                                keys.length + i);
-                        aggregators[i].outputFinalResult(dos, accessor
-                                .getBuffer().array(),
-                                tupleOffset + accessor.getFieldSlotsLength()
-                                        + fieldOffset,
+                        int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
+                        aggregators[i].outputFinalResult(dos, accessor.getBuffer().array(),
+                                tupleOffset + accessor.getFieldSlotsLength() + fieldOffset,
                                 ((AggregateState[]) state.state)[i]);
                     } else {
-                        aggregators[i].outputFinalResult(dos, null, 0,
-                                ((AggregateState[]) state.state)[i]);
+                        aggregators[i].outputFinalResult(dos, null, 0, ((AggregateState[]) state.state)[i]);
                     }
                     tupleBuilder.addFieldEndOffset();
                 }
             }
 
             @Override
-            public void init(ArrayTupleBuilder tupleBuilder,
-                    IFrameTupleAccessor accessor, int tIndex,
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
-                
+
                 tupleBuilder.reset();
                 for (int i = 0; i < keys.length; i++) {
                     tupleBuilder.addField(accessor, tIndex, keys[i]);
@@ -153,8 +135,7 @@
                 DataOutput dos = tupleBuilder.getDataOutput();
 
                 for (int i = 0; i < aggregators.length; i++) {
-                    aggregators[i].init(accessor, tIndex, dos,
-                            ((AggregateState[]) state.state)[i]);
+                    aggregators[i].init(accessor, tIndex, dos, ((AggregateState[]) state.state)[i]);
                     if (aggregators[i].needsBinaryState()) {
                         tupleBuilder.addFieldEndOffset();
                     }
@@ -178,37 +159,26 @@
             }
 
             @Override
-            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
-                    IFrameTupleAccessor stateAccessor, int stateTupleIndex,
-                    AggregateState state) throws HyracksDataException {
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
                 if (stateAccessor != null) {
-                    int stateTupleOffset = stateAccessor
-                            .getTupleStartOffset(stateTupleIndex);
+                    int stateTupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
                     int fieldIndex = 0;
                     for (int i = 0; i < aggregators.length; i++) {
                         if (aggregators[i].needsBinaryState()) {
-                            int stateFieldOffset = stateAccessor
-                                    .getFieldStartOffset(stateTupleIndex,
-                                            keys.length + fieldIndex);
-                            aggregators[i].aggregate(
-                                    accessor,
-                                    tIndex,
-                                    stateAccessor.getBuffer().array(),
-                                    stateTupleOffset
-                                            + stateAccessor
-                                                    .getFieldSlotsLength()
-                                            + stateFieldOffset,
+                            int stateFieldOffset = stateAccessor.getFieldStartOffset(stateTupleIndex, keys.length
+                                    + fieldIndex);
+                            aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
+                                    stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
                                     ((AggregateState[]) state.state)[i]);
                             fieldIndex++;
                         } else {
-                            aggregators[i].aggregate(accessor, tIndex, null, 0,
-                                    ((AggregateState[]) state.state)[i]);
+                            aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
                         }
                     }
                 } else {
                     for (int i = 0; i < aggregators.length; i++) {
-                        aggregators[i].aggregate(accessor, tIndex, null, 0,
-                                ((AggregateState[]) state.state)[i]);
+                        aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
                     }
                 }
             }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index eb58cb9..84caadf 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -72,8 +72,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
                 recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index b0e3dbe..85a6912 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -40,8 +40,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) throws HyracksDataException {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new ConstantTupleSourceOperatorNodePushable(ctx, fieldSlots, tupleData, tupleSize);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index b6a96e3..8e0ac79 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -32,8 +32,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             @Override
             public void open() throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 2d4a4d0..7effbb0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -61,8 +61,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
                 recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 5d62607..6874f77 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -24,8 +24,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions)
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
             throws HyracksDataException {
         return new AbstractUnaryInputOperatorNodePushable() {
             private final IFrameWriter[] writers = new IFrameWriter[outputArity];
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index e7fb9ac..3e99000 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -72,8 +72,8 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private CollectTaskState state;
 
@@ -117,8 +117,8 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-                final int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private IOpenableDataWriter<Object[]> writer;
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index fcf4978..f55eb3d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -55,8 +55,8 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-                int partition, int nPartitions)
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
                 throws HyracksDataException {
             RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             return new UnionOperator(ctx, inRecordDesc);