Restructured the aggregate functions to simplify average and make all abstract classes more consistent.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
index 8e396ed..4d804ee 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -15,22 +15,32 @@
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
 import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
 import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -38,18 +48,38 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
+    private static final int SUM_OFFSET = 0;
+    private static final int COUNT_OFFSET = 8;
+    protected static final int AGG_TYPE_OFFSET = 16;
 
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
-
     private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ADOUBLE);
     @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
 
@@ -62,102 +92,185 @@
         try {
             state.writeDouble(0.0);
             state.writeLong(0);
-            state.writeBoolean(false);
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    @Override
-    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+    public abstract void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException;
+
+    public abstract void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    public abstract void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    protected abstract void processNull(byte[] state, int start);
+
+    protected void processDataValues(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
         inputVal.reset();
         eval.evaluate(tuple);
-        double sum = BufferSerDeUtil.getDouble(state, start);
-        long count = BufferSerDeUtil.getLong(state, start + 8);
-        if (inputVal.getLength() > 0) {
-            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-            if (typeTag != ATypeTag.NULL) {
-                ++count;
-            }
-            switch (typeTag) {
-                case INT8: {
-                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case INT16: {
-                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case INT32: {
-                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case INT64: {
-                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case FLOAT: {
-                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case DOUBLE: {
-                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                    sum += val;
-                    break;
-                }
-                case NULL: {
-                    processNull(state, start);
-                    break;
-                }
-                default: {
-                    throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                }
-            }
-            inputVal.reset();
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
         }
-        BufferSerDeUtil.writeDouble(sum, state, start);
-        BufferSerDeUtil.writeLong(count, state, start + 8);
+        ++count;
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+        BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
     }
 
-    @Override
-    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-        double sum = BufferSerDeUtil.getDouble(state, start);
-        long count = BufferSerDeUtil.getLong(state, start + 8);
-        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-        if (count == 0) {
-            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-        } else {
+    protected void finishPartialResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (recordEval == null) {
+            ARecordType recType;
             try {
-                if (metNull)
-                    nullSerde.serialize(ANull.NULL, result);
-                else {
-                    aDouble.setValue(sum / count);
-                    doubleSerde.serialize(aDouble, result);
-                }
-            } catch (IOException e) {
+                recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                        BuiltinType.AINT64 }, false);
+            } catch (AsterixException e) {
                 throw new AlgebricksException(e);
             }
+            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+                    avgBytes, result);
         }
-    }
 
-    @Override
-    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException {
         try {
-            partialResult.write(data, start, len);
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                result.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+                countBytes.reset();
+                aInt64.setValue(count);
+                longSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            }
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    protected void processNull(byte[] state, int start) {
-        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-        metNull = true;
-        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+    protected void processPartialResults(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull(state, start);
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+                BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+                BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+                state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
     }
+
+    protected void finishFinalResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL)
+                nullSerde.serialize(ANull.NULL, result);
+            else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, result);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
index 9388671..71f709f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -36,6 +36,9 @@
  * count(NULL) returns NULL.
  */
 public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int MET_NULL_OFFSET = 0;
+    private static final int COUNT_OFFSET = 1;
+
     private AMutableInt64 result = new AMutableInt64(-1);
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
@@ -72,8 +75,8 @@
         } else {
             cnt++;
         }
-        BufferSerDeUtil.writeBoolean(metNull, state, start);
-        BufferSerDeUtil.writeLong(cnt, state, start + 1);
+        BufferSerDeUtil.writeBoolean(metNull, state, start + MET_NULL_OFFSET);
+        BufferSerDeUtil.writeLong(cnt, state, start + COUNT_OFFSET);
     }
 
     @Override
@@ -98,8 +101,6 @@
     }
 
     protected void processNull(byte[] state, int start) {
-        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-        metNull = true;
-        BufferSerDeUtil.writeBoolean(metNull, state, start);
+        BufferSerDeUtil.writeBoolean(true, state, start + MET_NULL_OFFSET);
     }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
index 5ef6f23..68c5b43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -43,7 +44,10 @@
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+public abstract class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+    protected static final int AGG_TYPE_OFFSET = 0;
+    private static final int SUM_OFFSET = 1;
+
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
     private AMutableDouble aDouble = new AMutableDouble(0);
@@ -53,13 +57,11 @@
     private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
     private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
     @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-    private final boolean isLocalAgg;
+    public ISerializerDeserializer serde;
 
-    public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+    public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
         eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
@@ -74,22 +76,28 @@
 
     @Override
     public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
+        if (skipStep(state, start)) {
+            return;
+        }
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
         inputVal.reset();
         eval.evaluate(tuple);
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
         if (typeTag == ATypeTag.NULL) {
             processNull(state, start);
             return;
-        } else if (aggType == ATypeTag.NULL) {
-            return;
         } else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
         }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
         switch (typeTag) {
             case INT8: {
                 byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
@@ -126,27 +134,22 @@
                 break;
             }
             case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
+                processSystemNull();
                 break;
             }
             default: {
                 throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
             }
         }
-        state[start] = aggType.serialize();
-        BufferSerDeUtil.writeDouble(sum, state, start + 1);
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
         try {
             switch (aggType) {
                 case INT8: {
@@ -191,20 +194,16 @@
                     break;
                 }
                 case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
+                    finishSystemNull(out);
                     break;
                 }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
-
     }
 
     @Override
@@ -212,9 +211,11 @@
         finish(state, start, len, out);
     }
 
-    protected void processNull(byte[] state, int start) {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        aggType = ATypeTag.NULL;
-        state[start] = aggType.serialize();
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
     }
+    protected abstract void processNull(byte[] state, int start);
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull(DataOutput out) throws IOException;
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
index 4f66c54..202933d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -14,8 +14,13 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
@@ -23,4 +28,29 @@
         super(args);
     }
 
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index 182214a..ccfeb43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -47,7 +47,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableGlobalAvgAggregateFunction(args, false);
+                return new SerializableGlobalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
index 1a69283..c6d4b5a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -15,13 +15,43 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
-        super(args, isLocalAgg);
+    public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
     }
 
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 5314ebe..7fa6b9c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -47,7 +47,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableGlobalSqlAvgAggregateFunction(args, false);
+                return new SerializableGlobalSqlAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
index 0821ea0..cc5043f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -15,17 +15,35 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
-        super(args, isLocalAgg);
+    public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
     }
 
     @Override
     protected void processNull(byte[] state, int start) {
     }
-    
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 9344f45..c015682 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -46,7 +46,7 @@
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableLocalAvgAggregateFunction(args, true);
+                return new SerializableLocalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
index aed5de2..00835d7 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -15,17 +15,43 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableLocalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
-        super(args, isLocalAgg);
+    public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
     }
 
     @Override
-    protected void processNull(byte[] state, int start) {
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
     }
-    
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index f1a9ad8..58415d3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -46,7 +46,7 @@
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableLocalSqlAvgAggregateFunction(args, true);
+                return new SerializableLocalSqlAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
index 94c2dd4..0caeadf 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -15,17 +15,35 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
-    public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
-        super(args, isLocalAgg);
+    public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
     }
 
     @Override
     protected void processNull(byte[] state, int start) {
     }
-    
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
index b922308..c1f77a3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
 
@@ -24,7 +27,22 @@
     }
 
     @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    @Override
     protected void processNull(byte[] state, int start) {
     }
-    
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 9d6e46e..f80ade3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -14,18 +14,49 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 
 public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+    private final boolean isLocalAgg;
 
     public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
             throws AlgebricksException {
-        super(args, isLocalAgg);
+        super(args);
+        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
     protected void processNull(byte[] state, int start) {
     }
-    
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 6f20e0b..15e546a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -14,14 +14,57 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 
 public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+    private final boolean isLocalAgg;
 
     public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
             throws AlgebricksException {
-        super(args, isLocalAgg);
+        super(args);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    protected void processNull(byte[] state, int start) {
+        ATypeTag aggType = ATypeTag.NULL;
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
     }
 
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index d322669..cfc5f1c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -17,8 +17,6 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -37,7 +35,6 @@
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -56,17 +53,19 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
     private final ARecordType recType;
 
     private DataOutput out;
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
-    private ATypeTag aggType;
+    protected ATypeTag aggType;
     private double sum;
     private long count;
     private AMutableDouble aDouble = new AMutableDouble(0);
     private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private final boolean isLocalAgg;
 
     private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
     private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -81,30 +80,21 @@
     private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ADOUBLE);
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.AINT64);
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
 
-    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output, boolean isLocalAgg)
+    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
             throws AlgebricksException {
         eval = args[0].createEvaluator(inputVal);
         out = output.getDataOutput();
-        this.isLocalAgg = isLocalAgg;
 
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
         ARecordType tmpRecType;
         try {
-            if (isLocalAgg) {
-                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
-            } else {
-                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-            }
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                    BuiltinType.AINT64 }, false);
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
         }
@@ -121,25 +111,24 @@
         count = 0;
     }
 
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+    public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public abstract void finish() throws AlgebricksException;
+
+    public abstract void finishPartial() throws AlgebricksException;
+
+    protected abstract void processNull();
+
+    protected void processDataValues(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
         inputVal.reset();
         eval.evaluate(tuple);
-        byte[] serBytes = inputVal.getByteArray();
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
         if (typeTag == ATypeTag.NULL) {
             processNull();
             return;
-        } else if (aggType == ATypeTag.NULL) {
-            return;
-        } else if (typeTag == ATypeTag.RECORD) {
-            // Global aggregate
-            if (isLocalAgg) {
-                throw new AlgebricksException("Record type can not be processed by in a local-avg operation.");
-            } else if (typeTag == ATypeTag.SYSTEM_NULL) {
-                // ignore
-                return;
-            }
         } else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
         } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -148,11 +137,7 @@
         } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
             aggType = typeTag;
         }
-
-        if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != ATypeTag.RECORD) {
-            ++count;
-        }
-
+        ++count;
         switch (typeTag) {
             case INT8: {
                 byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
@@ -184,33 +169,6 @@
                 sum += val;
                 break;
             }
-            case NULL: {
-                break;
-            }
-            case SYSTEM_NULL: {
-                if (isLocalAgg) {
-                    throw new AlgebricksException("SYSTEM_NULL can not be processed by in a local-avg operation.");
-                }
-                break;
-            }
-            case RECORD: {
-                // Expected for global aggregate.
-                // The record length helps us determine whether the input record fields are nullable.
-                int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
-                int nullBitmapSize = 1;
-                if (recordLength == 29) {
-                    nullBitmapSize = 0;
-                }
-                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
-                if (offset1 == 0) // the sum is null
-                    aggType = ATypeTag.NULL;
-                else
-                    sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
-                if (offset2 != 0) // the count is not null
-                    count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-                break;
-            }
             default: {
                 throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
             }
@@ -218,70 +176,80 @@
         inputVal.reset();
     }
 
-    @Override
-    public void finish() throws AlgebricksException {
+    protected void finishPartialResults() throws AlgebricksException {
         try {
-            if (isLocalAgg) {
-                if (count == 0 && aggType != ATypeTag.NULL) {
-                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    return;
+            // Double check that count 0 is accounted
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
                 }
-                if (aggType == ATypeTag.NULL) {
-                    sumBytes.reset();
-                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                } else {
-                    sumBytes.reset();
-                    aDouble.setValue(sum);
-                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                }
+                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                out.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
                 countBytes.reset();
                 aInt64.setValue(count);
-                intSerde.serialize(aInt64, countBytesOutput);
+                longSerde.serialize(aInt64, countBytesOutput);
                 recordEval.evaluate(null);
-            } else {
-                if (count == 0 || aggType == ATypeTag.NULL) {
-                    nullSerde.serialize(ANull.NULL, out);
-                } else {
-                    aDouble.setValue(sum / count);
-                    doubleSerde.serialize(aDouble, out);
-                }
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
     }
 
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        if (isLocalAgg) {
-            finish();
-        } else {
-            if (count == 0) {
-                if (GlobalConfig.DEBUG) {
-                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
-                }
-            } else {
-                try {
-                    if (count == 0 || aggType == ATypeTag.NULL) {
-                        sumBytes.reset();
-                        nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                    } else {
-                        sumBytes.reset();
-                        aDouble.setValue(sum);
-                        doubleSerde.serialize(aDouble, sumBytesOutput);
-                    }
-                    countBytes.reset();
-                    aInt64.setValue(count);
-                    intSerde.serialize(aInt64, countBytesOutput);
-                    recordEval.evaluate(null);
-                } catch (IOException e) {
-                    throw new AlgebricksException(e);
-                }
+    protected void processPartialResults(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull();
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
             }
         }
     }
-    
-    protected void processNull() {
-        aggType = ATypeTag.NULL;
+
+    protected void finishFinalResults() throws AlgebricksException {
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
     }
+
+    protected boolean skipStep() {
+        return false;
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
index bae42a9..29e57f5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -42,7 +42,7 @@
             .getSerializerDeserializer(BuiltinType.AINT64);
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
-    private long cnt;
+    protected long cnt;
     private DataOutput out;
 
     public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
@@ -84,7 +84,5 @@
         finish();
     }
 
-    protected void processNull() {
-        cnt++;
-    }
+    protected abstract void processNull();
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 09c0171..29aba33 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -36,20 +36,18 @@
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
     private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-    private DataOutput out;
+    protected DataOutput out;
     private ICopyEvaluator eval;
-    private ATypeTag aggType;
+    protected ATypeTag aggType;
     private IBinaryComparator cmp;
     private ITypePromoteComputer tpc;
     private final boolean isMin;
-    private final boolean isLocalAgg;
 
-    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
-            boolean isLocalAgg) throws AlgebricksException {
+    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+            throws AlgebricksException {
         out = provider.getDataOutput();
         eval = args[0].createEvaluator(inputVal);
         this.isMin = isMin;
-        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
@@ -90,11 +88,8 @@
             // If a system_null is encountered locally, it would be an error; otherwise if it is seen
             // by a global aggregator, it is simple ignored.
             if (typeTag == ATypeTag.SYSTEM_NULL) {
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                } else {
-                    return;
-                }
+                processSystemNull();
+                return;
             }
 
             if (ATypeHierarchy.canPromote(aggType, typeTag)) {
@@ -153,12 +148,7 @@
                     break;
                 }
                 case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        out.writeByte(ATypeTag.NULL.serialize());
-                    }
+                    finishSystemNull();
                     break;
                 }
                 default: {
@@ -176,7 +166,13 @@
         finish();
     }
 
-    protected void processNull() {
-        aggType = ATypeTag.NULL;
+    protected boolean skipStep() {
+        return false;
     }
+
+    protected abstract void processNull();
+
+    protected abstract void processSystemNull() throws AlgebricksException;
+
+    protected abstract void finishSystemNull() throws IOException;
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index b953f78..1332dd5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -46,11 +46,11 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
-    private DataOutput out;
+    protected DataOutput out;
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
     private double sum;
-    private ATypeTag aggType;
+    protected ATypeTag aggType;
     private AMutableDouble aDouble = new AMutableDouble(0);
     private AMutableFloat aFloat = new AMutableFloat(0);
     private AMutableInt64 aInt64 = new AMutableInt64(0);
@@ -58,15 +58,12 @@
     private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
     private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
     @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
+    protected ISerializerDeserializer serde;
 
-    private final boolean isLocalAgg;
-
-    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
             throws AlgebricksException {
         out = provider.getDataOutput();
         eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
@@ -77,14 +74,15 @@
 
     @Override
     public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
         inputVal.reset();
         eval.evaluate(tuple);
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
         if (typeTag == ATypeTag.NULL) {
             processNull();
             return;
-        } else if (aggType == ATypeTag.NULL) {
-            return;
         } else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
         } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -131,12 +129,7 @@
                 break;
             }
             case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
+                processSystemNull();
                 break;
             }
             default: {
@@ -192,13 +185,7 @@
                     break;
                 }
                 case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
+                    finishSystemNull();
                     break;
                 }
                 default:
@@ -215,7 +202,10 @@
         finish();
     }
 
-    protected void processNull() {
-        aggType = ATypeTag.NULL;
+    protected boolean skipStep() {
+        return false;
     }
+    protected abstract void processNull();
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull() throws IOException;
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
index 4fc9e37..2470e5a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -15,14 +15,41 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
-        super(args, output, false);
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
     }
 
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index 56afdd6..7c97fc2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -26,4 +26,9 @@
     public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
         super(args, output);
     }
+
+    protected void processNull() {
+        cnt++;
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
index de5185e..a63d4bc 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -15,14 +15,42 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class GlobalAvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
             throws AlgebricksException {
-        super(args, output, false);
+        super(args, output);
     }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
index 4619c4e..02c3c2c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -18,12 +18,28 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
             throws AlgebricksException {
-        super(args, output, false);
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
index 9f89c5a..2c7abd4 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -15,14 +15,42 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LocalAvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
             throws AlgebricksException {
-        super(args, output, true);
+        super(args, output);
     }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
index 0fca8a9..6a97410 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -18,15 +18,32 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
             throws AlgebricksException {
-        super(args, output, true);
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
     }
 
     @Override
     protected void processNull() {
     }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 7612c9c..0809701 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,14 +14,46 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
-public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction{
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+    private final boolean isLocalAgg;
 
     public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
             boolean isLocalAgg) throws AlgebricksException {
-        super(args, provider, isMin, isLocalAgg);
+        super(args, provider, isMin);
+        this.isLocalAgg = isLocalAgg;
     }
+
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            out.writeByte(ATypeTag.NULL.serialize());
+        }
+    }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
index 08d6353..11cf13f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -18,11 +18,27 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
 
     public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
-        super(args, output, false);
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
index 2111ccd..56236e3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -14,18 +14,40 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+    private final boolean isLocalAgg;
 
     public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
             boolean isLocalAgg) throws AlgebricksException {
-        super(args, provider, isMin, isLocalAgg);
+        super(args, provider, isMin);
+        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
     protected void processNull() {
     }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            out.writeByte(ATypeTag.NULL.serialize());
+        }
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
index 4bd533d..4418288 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -14,18 +14,48 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
-
+    private final boolean isLocalAgg;
+    
     public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
             throws AlgebricksException {
-        super(args, provider, isLocalAgg);
+        super(args, provider);
+        this.isLocalAgg = isLocalAgg;
     }
 
     @Override
     protected void processNull() {
     }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 52ec126..42b0346 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,14 +14,54 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 
 public class SumAggregateFunction extends AbstractSumAggregateFunction {
+    private final boolean isLocalAgg;
 
     public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
             throws AlgebricksException {
-        super(args, provider, isLocalAgg);
+        super(args, provider);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected boolean skipStep() { 
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
     }
 }