merged master back
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class UnnestOperator extends AbstractUnnestOperator {
 
     private LogicalVariable positionalVariable;
+
+    /**
+     * Used to set the position offset for positional variable
+     */
+    private ILogicalExpression positionOffsetExpr;
+
+    /**
+     * Specify the type of the positional variable
+     */
     private Object positionalVariableType;
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
         return positionalVariableType;
     }
 
+    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+        this.positionOffsetExpr = posOffsetExpr;
+    }
+
+    public ILogicalExpression getPositionOffsetExpr() {
+        return this.positionOffsetExpr;
+    }
+
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
         return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
         }
         return env;
     }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+                if (positionalVariable != null) {
+                    target.addVariable(positionalVariable);
+                }
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index cebddee..3a2617f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -404,7 +404,8 @@
     @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
-        propagateFDsAndEquivClasses(op, ctx);
+        ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
         return null;
     }
 
@@ -646,4 +647,4 @@
         return null;
     }
 
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 51b009f..bcd998d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -34,6 +35,7 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansRunningAggregatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -76,13 +78,19 @@
         }
         // compile subplans and set the gby op. schema accordingly
         AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
-        IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
-                fdColumns);
+        IAggregatorDescriptorFactory aggregatorFactory;
+
+        if (((AbstractLogicalOperator) (gby.getNestedPlans().get(0).getRoots().get(0).getValue())).getOperatorTag() == LogicalOperatorTag.RUNNINGAGGREGATE) {
+            aggregatorFactory = new NestedPlansRunningAggregatorFactory(subplans, keys, fdColumns);
+        } else {
+            aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
+        }
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                 columnList, context.getTypeEnvironment(op), context);
-        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
 
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 21def02..5cbcdeb 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -62,22 +63,25 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        IPartitioningProperty pp = null;
-        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
-        for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
-            StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
-            IPartitioningProperty p = f.getRequiredPartitioningProperty();
-            if (p != null) {
-                if (pp == null) {
-                    pp = p;
-                } else {
-                    throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
-                            + pp + " and " + p);
-                }
-            }
-        }
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+        return emptyUnaryRequirements();
+
+        //        IPartitioningProperty pp = null;
+        //        RunningAggregateOperator ragg = (RunningAggregateOperator) op;
+        //        for (Mutable<ILogicalExpression> exprRef : ragg.getExpressions()) {
+        //            StatefulFunctionCallExpression f = (StatefulFunctionCallExpression) exprRef.getValue();
+        //            IPartitioningProperty p = f.getRequiredPartitioningProperty();
+        //            if (p != null) {
+        //                if (pp == null) {
+        //                    pp = p;
+        //                } else {
+        //                    throw new IllegalStateException("Two stateful functions want to set partitioning requirements: "
+        //                            + pp + " and " + p);
+        //                }
+        //            }
+        //        }
+        //        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+        //        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         UnnestOperator unnest = (UnnestOperator) op;
-        if (unnest.getPositionalVariable() != null) {
-            throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
-                    + unnest.getPositionalVariable());
-        }
+
         int outCol = opSchema.findVariable(unnest.getVariable());
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
         UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
         IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+        // for position offset
+        ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+        IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+        if (posOffsetExpr != null) {
+            posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+                unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
         ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index af11b70..82e6970 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
@@ -60,6 +61,7 @@
                 hpe.getDomain());
         op1.setPhysicalOperator(hpme);
         op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+        op1.computeDeliveredPhysicalProperties(context);
         return true;
     }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 8f2eaac..09354a5 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -28,11 +28,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private AlgebricksPipeline[] subplans;
@@ -95,7 +95,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
@@ -114,6 +114,7 @@
                     offset = fieldEnds[i] - start;
                     tupleBuilder.addField(data, start, offset);
                 }
+                return true;
             }
 
             @Override
@@ -127,7 +128,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
new file mode 100644
index 0000000..dc9c805
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AlgebricksPipeline[] subplans;
+    private int[] keyFieldIdx;
+    private int[] decorFieldIdx;
+
+    public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
+        this.subplans = subplans;
+        this.keyFieldIdx = keyFieldIdx;
+        this.decorFieldIdx = decorFieldIdx;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
+            final IFrameWriter writer) throws HyracksDataException {
+        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
+                decorFieldIdx.length, writer);
+        final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+        for (int i = 0; i < subplans.length; i++) {
+            try {
+                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
+
+        final ByteBuffer outputFrame = ctx.allocateFrame();
+        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outputAppender.reset(outputFrame, true);
+
+        return new IAggregatorDescriptor() {
+
+            @Override
+            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+
+                for (int i = 0; i < pipelines.length; ++i) {
+                    pipelines[i].open();
+                }
+
+                gbyTb.reset();
+                for (int i = 0; i < keyFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
+                }
+                for (int i = 0; i < decorFieldIdx.length; ++i) {
+                    gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
+                }
+
+                // aggregate the first tuple
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+                    int stateTupleIndex, AggregateState state) throws HyracksDataException {
+                for (int i = 0; i < pipelines.length; i++) {
+                    outputWriter.setInputIdx(i);
+                    pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+                    pipelines[i].forceFlush();
+                }
+            }
+
+            @Override
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
+                return false;
+            }
+
+            @Override
+            public AggregateState createAggregateStates() {
+                return new AggregateState();
+            }
+
+            @Override
+            public void reset() {
+
+            }
+
+            @Override
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
+                throw new IllegalStateException("this method should not be called");
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+    }
+
+    private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+            throws AlgebricksException, HyracksDataException {
+        // plug the operators
+        IFrameWriter start = writer;
+        IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+            IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            if (i > 0) {
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+            } else {
+                // the nts has the same input and output rec. desc.
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+            }
+            start = newRuntime;
+        }
+        return start;
+    }
+
+    private static class RunningAggregatorOutput implements IFrameWriter {
+
+        private FrameTupleAccessor[] tAccess;
+        private RecordDescriptor[] inputRecDesc;
+        private int inputIdx;
+        private ArrayTupleBuilder tb;
+        private ArrayTupleBuilder gbyTb;
+        private AlgebricksPipeline[] subplans;
+        private IFrameWriter outputWriter;
+        private ByteBuffer outputFrame;
+        private FrameTupleAppender outputAppender;
+
+        public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
+                int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
+            this.subplans = subplans;
+            this.outputWriter = outputWriter;
+
+            // this.keyFieldIndexes = keyFieldIndexes;
+            int totalAggFields = 0;
+            this.inputRecDesc = new RecordDescriptor[subplans.length];
+            for (int i = 0; i < subplans.length; i++) {
+                RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+                this.inputRecDesc[i] = rd[rd.length - 1];
+                totalAggFields += subplans[i].getOutputWidth();
+            }
+            tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+            gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
+
+            this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+            for (int i = 0; i < inputRecDesc.length; i++) {
+                tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
+            }
+
+            this.outputFrame = ctx.allocateFrame();
+            this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+            this.outputAppender.reset(outputFrame, true);
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            int w = subplans[inputIdx].getOutputWidth();
+            IFrameTupleAccessor accessor = tAccess[inputIdx];
+            accessor.reset(buffer);
+            for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
+                tb.reset();
+                byte[] data = gbyTb.getByteArray();
+                int[] fieldEnds = gbyTb.getFieldEndOffsets();
+                int start = 0;
+                int offset = 0;
+                for (int i = 0; i < fieldEnds.length; i++) {
+                    if (i > 0)
+                        start = fieldEnds[i - 1];
+                    offset = fieldEnds[i] - start;
+                    tb.addField(data, start, offset);
+                }
+                for (int f = 0; f < w; f++) {
+                    tb.addField(accessor, tIndex, f);
+                }
+                if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(outputFrame, outputWriter);
+                    outputAppender.reset(outputFrame, true);
+                    if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                        throw new HyracksDataException(
+                                "Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            // clearFrame();
+        }
+
+        public void setInputIdx(int inputIdx) {
+            this.inputIdx = inputIdx;
+        }
+
+        public ArrayTupleBuilder getGroupByTupleBuilder() {
+            return gbyTb;
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+        }
+
+    }
+
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index d4161fc..39db3e1 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+public class SerializableAggregatorDescriptorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
     private static final long serialVersionUID = 1L;
     private ICopySerializableAggregateFunctionFactory[] aggFactories;
 
@@ -109,7 +109,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -125,10 +125,11 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 byte[] data = accessor.getBuffer().array();
                 int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -144,6 +145,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 8e472ec..9b638b4 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -25,11 +25,11 @@
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+public class SimpleAlgebricksAccumulatingAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private IAggregateEvaluatorFactory[] aggFactories;
@@ -86,7 +86,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
                 for (int i = 0; i < agg.length; i++) {
@@ -97,6 +97,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -118,7 +119,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
             FrameUtils.flushFrame(frame, writer);
             appender.reset(frame, true);
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException(
-                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+                throw new HyracksDataException(
+                        "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
             }
         }
         if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 200e5c5..7ecb288 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -63,8 +63,6 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
-                    outRecordDesc, groupFields, groupFields);
             final ByteBuffer copyFrame = ctx.allocateFrame();
             final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
             copyFrameAccessor.reset(copyFrame);
@@ -78,6 +76,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+                            outRecordDesc, groupFields, groupFields, writer);
                     pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
                             outRecordDesc, writer);
                     pgw.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
                             FrameUtils.flushFrame(frame, writer);
                             appender.reset(frame, true);
                             if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
-                                throw new IllegalStateException("Could not write frame.");
+                                throw new HyracksDataException(
+                                        "Could not write frame: subplan result is larger than the single-frame limit.");
                             }
                         }
                     }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5486fae..55dbcbb 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
 
@@ -64,5 +65,10 @@
         public void fail() throws HyracksDataException {
             writer.fail();
         }
+
+        public void forceFlush() throws HyracksDataException {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+        }
     }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index e92724a..cfc2bb6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -89,6 +90,10 @@
 
             @Override
             public void open() throws HyracksDataException {
+                if(!first){
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                }
                 initAccessAppendRef(ctx);
                 if (first) {
                     first = false;
@@ -96,12 +101,18 @@
                     for (int i = 0; i < n; i++) {
                         try {
                             raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
-                            raggs[i].init();
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
                     }
                 }
+                for (int i = 0; i < runningAggregates.length; i++) {
+                    try {
+                        raggs[i].init();
+                    } catch (AlgebricksException ae) {
+                        throw new HyracksDataException(ae);
+                    }
+                }
                 writer.open();
             }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -36,12 +41,20 @@
     private int outColPos;
     private final boolean outColIsProjected;
 
+    private final boolean hasPositionalVariable;
+    private IScalarEvaluatorFactory posOffsetEvalFactory;
+
     // Each time step() is called on the aggregate, a new value is written in
     // its output. One byte is written before that value and is neglected.
     // By convention, if the aggregate function writes nothing, it means it
     // produced the last value.
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+        this(outCol, unnestingFactory, projectionList, false, null);
+    }
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+            boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
             }
         }
         outColIsProjected = outColPos >= 0;
+        this.hasPositionalVariable = hashPositionalVariable;
+        this.posOffsetEvalFactory = posOffsetEvalFactory;
+        if (this.posOffsetEvalFactory == null) {
+            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+        }
     }
 
     @Override
@@ -68,6 +86,9 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
+            private int tupleCount;
+            private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
             @Override
             public void open() throws HyracksDataException {
                 initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                tupleCount = 1;
                 writer.open();
             }
 
@@ -86,6 +108,16 @@
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
+
+                    try {
+                        offsetEval.evaluate(tRef, p);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+
+                    @SuppressWarnings("static-access")
+                    int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
                     try {
                         agg.init(tRef);
                         boolean goon = true;
@@ -94,21 +126,33 @@
                             if (!agg.step(p)) {
                                 goon = false;
                             } else {
-                                if (!outColIsProjected) {
+
+                                if (!outColIsProjected && !hasPositionalVariable) {
                                     appendProjectionToFrame(t, projectionList);
                                 } else {
                                     for (int f = 0; f < outColPos; f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
-                                    tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                    if (outColIsProjected) {
+                                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                                    } else {
+                                        tupleBuilder.addField(tAccess, t, outColPos);
+                                    }
+                                    for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+                                            : projectionList.length); f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
                                 }
+                                if (hasPositionalVariable) {
+                                    // Write the positional variable as an INT32
+                                    tupleBuilder.getDataOutput().writeByte(3);
+                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.addFieldEndOffset();
+                                }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
                             }
                         } while (goon);
-                    } catch (AlgebricksException ae) {
+                    } catch (AlgebricksException | IOException ae) {
                         throw new HyracksDataException(ae);
                     }
                 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
     public IIOManager getIOManager();
 
     public ByteBuffer allocateFrame() throws HyracksDataException;
+    
+    public void deallocateFrames(int frameCount);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
         return ByteBuffer.allocate(frameSize);
     }
 
+    @Override
+    public void deallocateFrames(int frameCount) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index c72ced1..20075ff 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -115,6 +115,11 @@
     }
 
     @Override
+    public void deallocateFrames(int frameCount) {
+        joblet.deallocateFrames(frameCount);
+    }
+
+    @Override
     public int getFrameSize() {
         return joblet.getFrameSize();
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
index 8add260..4829173 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/memory/MemoryManager.java
@@ -26,10 +26,11 @@
 
     @Override
     public boolean allocate(long memory) {
-        if (this.memory.addAndGet(-memory) < 0) {
-            this.memory.addAndGet(memory);
-            return false;
-        }
+        // commented as now the deallocation is not implemented yet.
+        //        if (this.memory.addAndGet(-memory) < 0) {
+        //            this.memory.addAndGet(memory);
+        //            return false;
+        //        }
         return true;
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..e2fe892
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractAccumulatingAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[], edu.uci.ics.hyracks.api.comm.IFrameWriter)
+     */
+    @Override
+    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+            throws HyracksDataException {
+        return this
+                .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+    }
+
+    abstract public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields,
+            final int[] keyFieldsInPartialResults) throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
new file mode 100644
index 0000000..3dcd4c3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor, int, edu.uci.ics.hyracks.dataflow.std.group.AggregateState)
+     */
+    @Override
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            AggregateState state) throws HyracksDataException {
+        return false;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index ecd5284..df74a93 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -38,9 +38,6 @@
 import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
 
-/**
- *
- */
 public class HashSpillableTableFactory implements ISpillableTableFactory {
 
     private static final long serialVersionUID = 1L;
@@ -104,7 +101,7 @@
         }
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+                outRecordDescriptor, keyFields, keyFieldsInPartialResults, null);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
index afe460c..f9d4cd3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptor.java
@@ -18,9 +18,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-/**
- *
- */
 public interface IAggregatorDescriptor {
 
     /**
@@ -82,9 +79,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     /**
@@ -97,9 +95,10 @@
      * @param offset
      * @param state
      *            The aggregation state.
+     * @return TODO
      * @throws HyracksDataException
      */
-    public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
             AggregateState state) throws HyracksDataException;
 
     public void close();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index a4c0564..241956b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -16,17 +16,15 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-/**
- *
- */
 public interface IAggregatorDescriptorFactory extends Serializable {
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
-            throws HyracksDataException;
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults,
+            IFrameWriter writer) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index 239f24f..08b2218 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -21,16 +21,14 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
-/**
- *
- */
-public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
+public class MultiFieldsAggregatorFactory extends AbstractAccumulatingAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
@@ -78,8 +76,8 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
                 int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -90,11 +88,11 @@
                             ((AggregateState[]) state.state)[i]);
                     tupleBuilder.addFieldEndOffset();
                 }
-
+                return true;
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 DataOutput dos = tupleBuilder.getDataOutput();
 
@@ -110,6 +108,7 @@
                     }
                     tupleBuilder.addFieldEndOffset();
                 }
+                return true;
             }
 
             @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 21b3b60..c41e9ca 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -91,7 +91,7 @@
         }
 
         aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, writer);
         aggregateState = aggregator.createAggregateStates();
 
         storedKeys = new int[keyFields.length];
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index f8ba003..3c0eb2b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -121,7 +121,7 @@
         }
 
         this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, fields,
-                keyFieldsInPartialResults);
+                keyFieldsInPartialResults, null);
 
         this.aggregateStates = new AggregateState[INIT_AGG_STATE_SIZE];
         accumulatorSize = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 35cb9e1..71af928 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -55,7 +55,7 @@
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, groupFields, groupFields);
+                outRecordDescriptor, groupFields, groupFields, writer);
         final ByteBuffer copyFrame = ctx.allocateFrame();
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
         copyFrameAccessor.reset(copyFrame);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..c7a9f6d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
 
     private boolean first;
 
+    private boolean isFailed = false;
+
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
             IFrameWriter writer) throws HyracksDataException {
@@ -119,9 +121,9 @@
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
 
-        if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+        if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                 tupleBuilder.getSize())) {
             FrameUtils.flushFrame(outFrame, writer);
             appender.reset(outFrame, true);
@@ -149,12 +151,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        isFailed = true;
         writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (!first) {
+        if (!isFailed && !first) {
             writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 67d1796..4f586f9 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -565,6 +565,12 @@
         public ByteBuffer allocateFrame() {
             return ByteBuffer.allocate(FRAME_SIZE);
         }
+        
+        @Override
+        public void deallocateFrames(int frameCount) {
+            // TODO Auto-generated method stub
+            
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
     public ByteBuffer allocateFrame() {
         return jobletContext.allocateFrame();
     }
+    
+    @Override
+    public void deallocateFrames(int frameCount) {
+    }
 
     @Override
     public int getFrameSize() {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index acd766e..d243c8a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -99,7 +99,7 @@
             }
 
             @Override
-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
                 ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
@@ -113,6 +113,7 @@
                         throw new HyracksDataException(e);
                     }
                 }
+                return true;
             }
 
             @Override
@@ -121,7 +122,7 @@
             }
 
             @Override
-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
                     AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }