checkpoint: added support on running aggregation using group-by runtime.
Aggregator interface is also updated in order to handle both
accumulating and running aggregation.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index c739b23..fac1f20 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -405,8 +405,11 @@
 	@Override
 	public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
 			throws AlgebricksException {
-		propagateFDsAndEquivClasses(op, ctx);
-		return null;
+	    ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
+        return null;
+        //		propagateFDsAndEquivClasses(op, ctx);
+        //		return null;
 	}
 
 	@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 51b009f..bcd998d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -76,13 +78,19 @@
         }
         // compile subplans and set the gby op. schema accordingly
         AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
-        IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
-                fdColumns);
+        IAggregatorDescriptorFactory aggregatorFactory;
+
+        if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+            aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+        } else {
+            aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+        }
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                 columnList, context.getTypeEnvironment(op), context);
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
 
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 21def02..5cbcdeb 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -62,22 +63,25 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        IPartitioningProperty pp = null;
-        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
-        for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
-            StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
-            IPartitioningProperty p = f.getRequiredPartitioningProperty();
-            if (p != null) {
-                if (pp == null) {
-                    pp = p;
-                } else {
-                    throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
-                            + pp + " and " + p);
-                }
-            }
-        }
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+        return emptyUnaryRequirements();
+
+        //        IPartitioningProperty pp = null;
+        //        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+        //        for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
+        //            StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
+        //            IPartitioningProperty p = f.getRequiredPartitioningProperty();
+        //            if (p != null) {
+        //                if (pp == null) {
+        //                    pp = p;
+        //                } else {
+        //                    throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
+        //                            + pp + " and " + p);
+        //                }
+        //            }
+        //        }
+        //        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+        //        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
     @Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 8f2eaac..09354a5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -28,11 +28,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private AlgebricksPipeline[] subplans;
@@ -95,7 +95,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
@@ -114,6 +114,7 @@
                     offset = fieldEnds[i] - start;
                     tupleBuilder.addField(data, start, offset);
                 }
+                return true;
             }
 
             @Override
@@ -127,7 +128,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
new file mode 100644
index 0000000..dc9c805
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AlgebricksPipeline[] subplans;
+    private int[] keyFieldIdx;
+    private int[] decorFieldIdx;
+
+    public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
+        this.subplans = subplans;
+        this.keyFieldIdx = keyFieldIdx;
+        this.decorFieldIdx = decorFieldIdx;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
+            final IFrameWriter writer) throws HyracksDataException {
+        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
+                decorFieldIdx.length, writer);
+        final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            try {
+                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
+
+        final ByteBuffer outputFrame = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputFrame, true);
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+
+                for (int i = 0; i < pipelines.length; ++i) {
+                    pipelines[i].open();
+                }
+
+                gbyTb.reset();
+                for (int i = 0; i < keyFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
+                }
+                for (int i = 0; i < decorFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
+                }
+
+                // aggregate the first tuple
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                return false;
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+    }
+
+    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+            throws AlgebricksException, HyracksDataException {
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+
+    private static class RunningAggregatorOutput implements IFrameWriter {
+
+        private FrameTupleAccessor[] tAccess;
+        private RecordDescriptor[] inputRecDesc;
+        private int inputIdx;
+        private ArrayTupleBuilder tb;
+        private ArrayTupleBuilder gbyTb;
+        private AlgebricksPipeline[] subplans;
+        private IFrameWriter outputWriter;
+        private ByteBuffer outputFrame;
+        private FrameTupleAppender outputAppender;
+
+        public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
+                int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
+            this.subplans = subplans;
+            this.outputWriter = outputWriter;
+
+            // this.keyFieldIndexes = keyFieldIndexes;
+            int totalAggFields = 0;
+            this.inputRecDesc = new RecordDescriptor[subplans.length];
+            for (int i = 0; i < subplans.length; i++) {
+                RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+                this.inputRecDesc[i] = rd[rd.length - 1];
+                totalAggFields += subplans[i].getOutputWidth();
+            }
+            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+            gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+
+            this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+            for (int i = 0; i < inputRecDesc.length; i++) {
+                tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+            }
+
+            this.outputFrame = ctx.allocateFrame();
+            this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+            this.outputAppender.reset(outputFrame, true);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            int w = subplans[inputIdx].getOutputWidth();
+            IFrameTupleAccessor accessor = tAccess[inputIdx];
+            accessor.reset(buffer);
+            for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
+                tb.reset();
+                byte[] data = gbyTb.getByteArray();
+                int[] fieldEnds = gbyTb.getFieldEndOffsets();
+                int start = 0;
+                int offset = 0;
+                for (int i = 0; i < fieldEnds.length; i++) {
+                    if (i > 0)
+                        start = fieldEnds[i - 1];
+                    offset = fieldEnds[i] - start;
+                    tb.addField(data, start, offset);
+                }
+                for (int f = 0; f < w; f++) {
+                    tb.addField(accessor, tIndex, f);
+                }
+                if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, outputWriter);
+                    outputAppender.reset(outputFrame, true);
+                    if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new HyracksDataException(
+                                "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // clearFrame();
+        }
+
+        public void setInputIdx(int inputIdx) {
+            this.inputIdx = inputIdx;
+        }
+
+        public ArrayTupleBuilder getGroupByTupleBuilder() {
+            return gbyTb;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+        }
+
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index d4161fc..39db3e1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private ICopySerializableAggregateFunctionFactory[] aggFactories;
 
@@ -109,7 +109,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -125,10 +125,11 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -144,6 +145,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 8e472ec..9b638b4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateEvaluatorFactory[] aggFactories;
@@ -86,7 +86,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
                 for (int i = 0; i < agg.length; i++) {
@@ -97,6 +97,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -118,7 +119,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 200e5c5..7ecb288 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -63,8 +63,6 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
-                    outRecordDesc, groupFields, groupFields);
             final ByteBuffer copyFrame = ctx.allocateFrame();
             final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
             copyFrameAccessor.reset(copyFrame);
@@ -78,6 +76,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                            outRecordDesc, groupFields, groupFields, writer);
                     pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
                             outRecordDesc, writer);
                     pgw.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5486fae..55dbcbb 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
@@ -64,5 +65,10 @@
         public void fail() throws HyracksDataException {
             writer.fail();
         }
+
+        public void forceFlush() throws HyracksDataException {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+        }
     }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index e92724a..cfc2bb6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -89,6 +90,10 @@
 
             @Override
             public void open() throws HyracksDataException {
+                if(!first){
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                }
                 initAccessAppendRef(ctx);
                 if (first) {
                     first = false;
@@ -96,12 +101,18 @@
                     for (int i = 0; i < n; i++) {
                         try {
                             raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
-                            raggs[i].init();
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
                     }
                 }
+                for (int i = 0; i < runningAggregates.length; i++) {
+                    try {
+                        raggs[i].init();
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
                 writer.open();
             }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..e2fe892
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[], edu.uci.ics.hyracks.api.comm.IFrameWriter)
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+            throws HyracksDataException {
+        return this
+                .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+    }
+
+    abstract public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
+            final int[] keyFieldsInPartialResults) throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
new file mode 100644
index 0000000..3dcd4c3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index ecd5284..df74a93 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -38,9 +38,6 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
-/**
- *
- */
 public class HashSpillableTableFactory implements ISpillableTableFactory {
 
     private static final long serialVersionUID = 1L;
@@ -104,7 +101,7 @@
         }
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+                outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index afe460c..f9d4cd3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -18,9 +18,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-/**
- *
- */
 public interface IAggregatorDescriptor {
 
     /**
@@ -82,9 +79,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     /**
@@ -97,9 +95,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     public void close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a4c0564..241956b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -16,17 +16,15 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-/**
- *
- */
 public interface IAggregatorDescriptorFactory extends Serializable {
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException;
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            IFrameWriter writer) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 239f24f..08b2218 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,16 +21,14 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
-/**
- *
- */
-public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
@@ -78,8 +76,8 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -90,11 +88,11 @@
                             ((AggregateState[]) state.state)[i]);
                     tupleBuilder.addFieldEndOffset();
                 }
-
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
@@ -110,6 +108,7 @@
                     }
                     tupleBuilder.addFieldEndOffset();
                 }
+                return true;
             }
 
             @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..c41e9ca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -91,7 +91,7 @@
         }
 
         aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, writer);
         aggregateState = aggregator.createAggregateStates();
 
         storedKeys = new int[keyFields.length];
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index f8ba003..3c0eb2b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -121,7 +121,7 @@
         }
 
         this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, null);
 
         this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
         accumulatorSize = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 35cb9e1..71af928 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -55,7 +55,7 @@
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, groupFields, groupFields);
+                outRecordDescriptor, groupFields, groupFields, writer);
         final ByteBuffer copyFrame = ctx.allocateFrame();
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
         copyFrameAccessor.reset(copyFrame);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 66efc03..c7a9f6d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -121,9 +121,9 @@
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
-        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+        if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                 tupleBuilder.getSize())) {
             FrameUtils.flushFrame(outFrame, writer);
             appender.reset(outFrame, true);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index acd766e..d243c8a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -99,7 +99,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
                 ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
@@ -113,6 +113,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -121,7 +122,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }