Updated the serializable aggregate functions to include sql aggregate functions. Includes changes to make sure only one copy of the base aggregate function is being used.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
index 72e0045..a215a07 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -62,129 +40,15 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
+            @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        try {
-                            partialResult.write(data, start, len);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                };
+                return new SerializableAvgAggregateFunction(args);
             }
         };
     }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index a9b264c..0c9ce06 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * count(NULL) returns NULL.
@@ -63,68 +49,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private AMutableInt64 result = new AMutableInt64(-1);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeLong(0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL) {
-                            metNull = true;
-                        } else {
-                            cnt++;
-                        }
-                        BufferSerDeUtil.writeBoolean(metNull, state, start);
-                        BufferSerDeUtil.writeLong(cnt, state, start + 1);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        try {
-                            if (metNull) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                result.setValue(cnt);
-                                int64Serde.serialize(result, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableCountAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index f720434..182214a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableGlobalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -70,154 +41,13 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType _recType;
-        try {
-            _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = _recType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                        BufferSerDeUtil.writeDouble(globalSum, state, start);
-                        BufferSerDeUtil.writeLong(globalCount, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, result);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, result);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput result)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-
-                        try {
-                            if (globalCount == 0 || metNull) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new SerializableGlobalAvgAggregateFunction(args, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 5d93018..5314ebe 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -70,154 +41,13 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType _recType;
-        try {
-            _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = _recType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                        BufferSerDeUtil.writeDouble(globalSum, state, start);
-                        BufferSerDeUtil.writeLong(globalCount, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, result);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, result);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput result)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-
-                        try {
-                            if (globalCount == 0 || metNull) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new SerializableGlobalSqlAvgAggregateFunction(args, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 219204b..9344f45 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -74,163 +41,12 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        }
-                        ++count;
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        state[start + 16] = aggType.serialize();
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (recordEval == null) {
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-                        }
-                        try {
-                            if (count == 0) {
-                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            int64Serde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        finish(state, start, len, partialResult);
-                    }
-
-                };
+                return new SerializableLocalAvgAggregateFunction(args, true);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index 6c8b6e0..f1a9ad8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -74,163 +41,12 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        }
-                        ++count;
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        state[start + 16] = aggType.serialize();
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (recordEval == null) {
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-                        }
-                        try {
-                            if (count == 0) {
-                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            int64Serde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        finish(state, start, len, partialResult);
-                    }
-
-                };
+                return new SerializableLocalSqlAvgAggregateFunction(args, true);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index dc91711..2ce85c6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
 public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSqlSumAggregateDescriptor();
@@ -36,7 +35,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
     }
 
     @Override
@@ -47,7 +46,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSumAggregateFunction(args, true);
+                return new SerializableSqlSumAggregateFunction(args, true);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 3f3aaff..bd6ead8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
 public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSumAggregateDescriptor();
@@ -36,7 +35,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
index 0ed0f13..786510b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -62,128 +40,12 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        try {
-                            partialResult.write(data, start, len);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                };
+                return new SerializableSqlAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
index 0e6be6c..5bfd6f2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * count(NULL) returns NULL.
@@ -63,68 +49,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private AMutableInt64 result = new AMutableInt64(-1);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeLong(0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL) {
-                            metNull = true;
-                        } else {
-                            cnt++;
-                        }
-                        BufferSerDeUtil.writeBoolean(metNull, state, start);
-                        BufferSerDeUtil.writeLong(cnt, state, start + 1);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        try {
-                            if (metNull) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                result.setValue(cnt);
-                                int64Serde.serialize(result, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableSqlCountAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 3a08642..c68b3b3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -46,7 +46,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new SerializableSumAggregateFunction(args, false);
+                return new SerializableSqlSumAggregateFunction(args, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index bea6ab8..6f20e0b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -14,199 +14,14 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-    private final boolean isLocalAgg;
+public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
 
     public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
             throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
+        super(args, isLocalAgg);
     }
 
-    @Override
-    public void init(DataOutput state) throws AlgebricksException {
-        try {
-            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-            state.writeDouble(0.0);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
-        }
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                aggType = typeTag;
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
-        }
-        state[start] = aggType.serialize();
-        BufferSerDeUtil.writeDouble(sum, state, start + 1);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-
-    }
-
-    @Override
-    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        finish(state, start, len, out);
-    }
 }