Merge branch 'master' into ecarm002/aggregate_sql_functions

* master:
  fix for issue 712
  Fixed a typo in the merge policy parameters.
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 299bbfb..8f0bab3 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -304,6 +304,57 @@
     public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "local-avg-serial", 1);
 
+    // sql aggregate functions
+    public final static FunctionIdentifier SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-avg", 1);
+    public final static FunctionIdentifier SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-count", 1);
+    public final static FunctionIdentifier SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-sum", 1);
+    public final static FunctionIdentifier LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-sum", 1);
+    public final static FunctionIdentifier SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-max", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-max", 1);
+    public final static FunctionIdentifier SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-min", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-min", 1);
+    public final static FunctionIdentifier GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-global-sql-avg", 1);
+    public final static FunctionIdentifier LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-avg", 1);
+
+    public final static FunctionIdentifier SCALAR_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count", 1);
+    public final static FunctionIdentifier SCALAR_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-max", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-min", 1);
+    public final static FunctionIdentifier SCALAR_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg", 1);
+
+    // serializable sql aggregate functions
+    public final static FunctionIdentifier SERIAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg-serial", 1);
+
     public final static FunctionIdentifier SCAN_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "scan-collection", 1);
     public final static FunctionIdentifier SUBSET_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -739,6 +790,29 @@
         addFunction(RANGE, AInt32TypeComputer.INSTANCE, true);
         addFunction(RECTANGLE_CONSTRUCTOR, OptionalARectangleTypeComputer.INSTANCE, true);
 
+        // SQL Aggregate Functions
+        addFunction(SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addFunction(SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addFunction(SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SCALAR_GLOBAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SCALAR_LOCAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MAX, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MIN, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_SUM, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_GLOBAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+
         addFunction(SCALAR_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
         addFunction(SCALAR_COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(SCALAR_GLOBAL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
@@ -913,6 +987,16 @@
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MAX), getAsterixFunctionInfo(MAX));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MIN), getAsterixFunctionInfo(MIN));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SUM), getAsterixFunctionInfo(SUM));
+        // SQL Aggregate Functions
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_AVG), getAsterixFunctionInfo(SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_COUNT), getAsterixFunctionInfo(SQL_COUNT));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_GLOBAL_SQL_AVG),
+                getAsterixFunctionInfo(GLOBAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_LOCAL_SQL_AVG),
+                getAsterixFunctionInfo(LOCAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MAX), getAsterixFunctionInfo(SQL_MAX));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MIN), getAsterixFunctionInfo(SQL_MIN));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_SUM), getAsterixFunctionInfo(SQL_SUM));
     }
 
     static {
@@ -964,6 +1048,55 @@
         addAgg(SERIAL_LOCAL_SUM);
         addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
         addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
+
+        // SQL Aggregate Functions
+        addAgg(SQL_AVG);
+        addAgg(LOCAL_SQL_AVG);
+        addAgg(GLOBAL_SQL_AVG);
+        addLocalAgg(SQL_AVG, LOCAL_SQL_AVG);
+        addGlobalAgg(SQL_AVG, GLOBAL_SQL_AVG);
+
+        addAgg(SQL_COUNT);
+        addLocalAgg(SQL_COUNT, SQL_COUNT);
+        addGlobalAgg(SQL_COUNT, SQL_SUM);
+
+        addAgg(SQL_MAX);
+        addAgg(LOCAL_SQL_MAX);
+        addLocalAgg(SQL_MAX, LOCAL_SQL_MAX);
+        addGlobalAgg(SQL_MAX, SQL_MAX);
+
+        addAgg(SQL_MIN);
+        addLocalAgg(SQL_MIN, LOCAL_SQL_MIN);
+        addGlobalAgg(SQL_MIN, SQL_MIN);
+
+        addAgg(SQL_SUM);
+        addAgg(LOCAL_SQL_SUM);
+        addLocalAgg(SQL_SUM, LOCAL_SQL_SUM);
+        addGlobalAgg(SQL_SUM, SQL_SUM);
+
+        // SQL serializable aggregate functions
+        addSerialAgg(SQL_AVG, SERIAL_SQL_AVG);
+        addSerialAgg(SQL_COUNT, SERIAL_SQL_COUNT);
+        addSerialAgg(SQL_SUM, SERIAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addSerialAgg(GLOBAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_COUNT);
+        addLocalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_COUNT);
+        addGlobalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_SUM);
+
+        addAgg(SERIAL_SQL_AVG);
+        addAgg(SERIAL_LOCAL_SQL_AVG);
+        addAgg(SERIAL_GLOBAL_SQL_AVG);
+        addLocalAgg(SERIAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addGlobalAgg(SERIAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_SUM);
+        addAgg(SERIAL_LOCAL_SQL_SUM);
+        addLocalAgg(SERIAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addGlobalAgg(SERIAL_SQL_SUM, SERIAL_SQL_SUM);
+
     }
 
     static {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..241ee46
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlAvgAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_AVG;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..1b581e4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlCountAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_COUNT;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..f58d578
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMaxAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MAX;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..da6e38d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMinAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..14aa6b7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlSumAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..4d804ee
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
+    private static final int SUM_OFFSET = 0;
+    private static final int COUNT_OFFSET = 8;
+    protected static final int AGG_TYPE_OFFSET = 16;
+
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    public AbstractSerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeDouble(0.0);
+            state.writeLong(0);
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public abstract void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException;
+
+    public abstract void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    public abstract void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    protected abstract void processNull(byte[] state, int start);
+
+    protected void processDataValues(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+        ++count;
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+        BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+    }
+
+    protected void finishPartialResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (recordEval == null) {
+            ARecordType recType;
+            try {
+                recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                        BuiltinType.AINT64 }, false);
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
+            }
+            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+                    avgBytes, result);
+        }
+
+        try {
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                result.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+                countBytes.reset();
+                aInt64.setValue(count);
+                longSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull(state, start);
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+                BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+                BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+                state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
+    }
+
+    protected void finishFinalResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL)
+                nullSerde.serialize(ANull.NULL, result);
+            else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, result);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
new file mode 100644
index 0000000..71f709f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int MET_NULL_OFFSET = 0;
+    private static final int COUNT_OFFSET = 1;
+
+    private AMutableInt64 result = new AMutableInt64(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+
+    public AbstractSerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeBoolean(false);
+            state.writeLong(0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+        } else {
+            cnt++;
+        }
+        BufferSerDeUtil.writeBoolean(metNull, state, start + MET_NULL_OFFSET);
+        BufferSerDeUtil.writeLong(cnt, state, start + COUNT_OFFSET);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+        try {
+            if (metNull) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                result.setValue(cnt);
+                int64Serde.serialize(result, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        BufferSerDeUtil.writeBoolean(true, state, start + MET_NULL_OFFSET);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
new file mode 100644
index 0000000..68c5b43
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+    protected static final int AGG_TYPE_OFFSET = 0;
+    private static final int SUM_OFFSET = 1;
+
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    public ISerializerDeserializer serde;
+
+    public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            state.writeDouble(0.0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+        }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                aggType = typeTag;
+                break;
+            }
+            case SYSTEM_NULL: {
+                processSystemNull();
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull(out);
+                    break;
+                }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+    protected abstract void processNull(byte[] state, int start);
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull(DataOutput out) throws IOException;
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
index 72e0045..a215a07 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -62,129 +40,15 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
+            @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        try {
-                            partialResult.write(data, start, len);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                };
+                return new SerializableAvgAggregateFunction(args);
             }
         };
     }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..202933d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index a9b264c..0c9ce06 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * count(NULL) returns NULL.
@@ -63,68 +49,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private AMutableInt64 result = new AMutableInt64(-1);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeLong(0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL) {
-                            metNull = true;
-                        } else {
-                            cnt++;
-                        }
-                        BufferSerDeUtil.writeBoolean(metNull, state, start);
-                        BufferSerDeUtil.writeLong(cnt, state, start + 1);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        try {
-                            if (metNull) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                result.setValue(cnt);
-                                int64Serde.serialize(result, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableCountAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
new file mode 100644
index 0000000..7a6aacd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+    public SerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index f720434..ccfeb43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableGlobalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -70,154 +41,13 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType _recType;
-        try {
-            _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = _recType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                        BufferSerDeUtil.writeDouble(globalSum, state, start);
-                        BufferSerDeUtil.writeLong(globalCount, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, result);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, result);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput result)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-
-                        try {
-                            if (globalCount == 0 || metNull) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new SerializableGlobalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..c6d4b5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..7fa6b9c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableGlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_GLOBAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableGlobalSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..cc5043f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 219204b..c015682 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -74,163 +41,12 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        }
-                        ++count;
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        state[start + 16] = aggType.serialize();
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (recordEval == null) {
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-                        }
-                        try {
-                            if (count == 0) {
-                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            int64Serde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        finish(state, start, len, partialResult);
-                    }
-
-                };
+                return new SerializableLocalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
new file mode 100644
index 0000000..00835d7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..58415d3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableLocalSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0caeadf
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..2ce85c6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 3f3aaff..bd6ead8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
 public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSumAggregateDescriptor();
@@ -36,7 +35,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..786510b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..c1f77a3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..5bfd6f2
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_COUNT;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlCountAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
new file mode 100644
index 0000000..525b643
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+    public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..c68b3b3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_SUM;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
new file mode 100644
index 0000000..f80ade3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+    private final boolean isLocalAgg;
+
+    public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+            throws AlgebricksException {
+        super(args);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index bea6ab8..15e546a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -17,196 +17,54 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
+public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
     private final boolean isLocalAgg;
 
     public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
             throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
+        super(args);
         this.isLocalAgg = isLocalAgg;
     }
 
-    @Override
-    public void init(DataOutput state) throws AlgebricksException {
-        try {
-            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-            state.writeDouble(0.0);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
+    protected void processNull(byte[] state, int start) {
+        ATypeTag aggType = ATypeTag.NULL;
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
     }
 
     @Override
-    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                aggType = typeTag;
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
-        }
-        state[start] = aggType.serialize();
-        BufferSerDeUtil.writeDouble(sum, state, start + 1);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
         }
-
     }
 
-    @Override
-    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        finish(state, start, len, out);
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
new file mode 100644
index 0000000..cfc5f1c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
+    private final ARecordType recType;
+
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    protected ATypeTag aggType;
+    private double sum;
+    private long count;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                    BuiltinType.AINT64 }, false);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        recType = tmpRecType;
+        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+                out);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+        count = 0;
+    }
+
+    public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public abstract void finish() throws AlgebricksException;
+
+    public abstract void finishPartial() throws AlgebricksException;
+
+    protected abstract void processNull();
+
+    protected void processDataValues(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+        ++count;
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+    }
+
+    protected void finishPartialResults() throws AlgebricksException {
+        try {
+            // Double check that count 0 is accounted
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                out.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+                countBytes.reset();
+                aInt64.setValue(count);
+                longSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull();
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
+    }
+
+    protected void finishFinalResults() throws AlgebricksException {
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
new file mode 100644
index 0000000..29e57f5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+    private AMutableInt64 result = new AMutableInt64(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    protected long cnt;
+    private DataOutput out;
+
+    public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+    }
+
+    @Override
+    public void init() {
+        cnt = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        // Ignore SYSTEM_NULL.
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+        } else if (typeTag != ATypeTag.SYSTEM_NULL) {
+            cnt++;
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            result.setValue(cnt);
+            int64Serde.serialize(result, out);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected abstract void processNull();
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
new file mode 100644
index 0000000..29aba33
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
+    protected DataOutput out;
+    private ICopyEvaluator eval;
+    protected ATypeTag aggType;
+    private IBinaryComparator cmp;
+    private ITypePromoteComputer tpc;
+    private final boolean isMin;
+
+    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isMin = isMin;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        outputVal.reset();
+        tempValForCasting.reset();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.NULL) {
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                // Ignore.
+                return;
+            }
+            // First value encountered. Set type, comparator, and initial value.
+            aggType = typeTag;
+            // Set comparator.
+            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(aggType, isMin);
+            cmp = cmpFactory.createBinaryComparator();
+            // Initialize min value.
+            outputVal.assign(inputVal);
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else {
+
+            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
+            // by a global aggregator, it is simple ignored.
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                processSystemNull();
+                return;
+            }
+
+            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
+                aggType = typeTag;
+                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+                        .createBinaryComparator();
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
+                                outputVal.getLength() - 1, tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    outputVal.reset();
+                    outputVal.assign(tempValForCasting);
+                }
+                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                    outputVal.assign(inputVal);
+                }
+
+            } else {
+                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
+                                tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
+                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
+                            outputVal.getLength()) < 0) {
+                        outputVal.assign(tempValForCasting);
+                    }
+                } else {
+                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                        outputVal.assign(inputVal);
+                    }
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case NULL: {
+                    out.writeByte(ATypeTag.NULL.serialize());
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull();
+                    break;
+                }
+                default: {
+                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+
+    protected abstract void processNull();
+
+    protected abstract void processSystemNull() throws AlgebricksException;
+
+    protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
new file mode 100644
index 0000000..1332dd5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
+    protected DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    protected ATypeTag aggType;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    protected ISerializerDeserializer serde;
+
+    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+        }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            case SYSTEM_NULL: {
+                processSystemNull();
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull();
+                    break;
+                }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+    protected abstract void processNull();
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 7815606..3ff836a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -15,51 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -78,165 +43,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private double sum;
-                    private long count;
-                    private ATypeTag aggType;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init() throws AlgebricksException {
-                        aggType = ATypeTag.SYSTEM_NULL;
-                        sum = 0.0;
-                        count = 0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                            aggType = typeTag;
-                        }
-
-                        if (typeTag != ATypeTag.SYSTEM_NULL) {
-                            ++count;
-                        }
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (count == 0 || aggType == ATypeTag.NULL) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                aDouble.setValue(sum / count);
-                                doubleSerde.serialize(aDouble, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
-                            }
-                        } else {
-                            try {
-                                if (aggType == ATypeTag.NULL) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(count);
-                                intSerde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-                };
+                return new AvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
new file mode 100644
index 0000000..2470e5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index c63ab62..7c97fc2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -14,70 +14,21 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
  */
-public class CountAggregateFunction implements ICopyAggregateFunction {
-    private AMutableInt64 result = new AMutableInt64(-1);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private long cnt;
-    private DataOutput out;
+public class CountAggregateFunction extends AbstractCountAggregateFunction {
 
     public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        out = output.getDataOutput();
+        super(args, output);
     }
 
-    @Override
-    public void init() {
-        cnt = 0;
+    protected void processNull() {
+        cnt++;
     }
 
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        // Ignore SYSTEM_NULL.
-        if (typeTag != ATypeTag.SYSTEM_NULL) {
-            cnt++;
-        }
-    }
-
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            result.setValue(cnt);
-            int64Serde.serialize(result, out);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index eb5d6bd..230e06b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -15,45 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class GlobalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -73,142 +44,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private double globalSum;
-                    private long globalCount;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private boolean metNull;
-
-                    @Override
-                    public void init() {
-                        globalSum = 0.0;
-                        globalCount = 0;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-
-                        // The record length helps us determine whether the input record fields are nullable.
-                        int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
-                        int nullBitmapSize = 1;
-                        if (recordLength == 29) {
-                            nullBitmapSize = 0;
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize,
-                                false);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize,
-                                false);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, out);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        try {
-                            if (metNull || globalCount == 0) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new GlobalAvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..a63d4bc
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0808e81
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.GLOBAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new GlobalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..02c3c2c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index 09a659c..fd98764 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -15,50 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LocalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -77,157 +43,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                List<IAType> unionList = new ArrayList<IAType>();
-                unionList.add(BuiltinType.ANULL);
-                unionList.add(BuiltinType.ADOUBLE);
-                ARecordType tmpRecType;
-                try {
-                    tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                            new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
-                } catch (AsterixException e) {
-                    throw new AlgebricksException(e);
-                }
-
-                final ARecordType recType = tmpRecType;
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private ATypeTag aggType;
-                    private double sum;
-                    private long count;
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @Override
-                    public void init() {
-                        aggType = ATypeTag.SYSTEM_NULL;
-                        sum = 0.0;
-                        count = 0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                            aggType = typeTag;
-                        }
-
-                        if (typeTag != ATypeTag.SYSTEM_NULL) {
-                            ++count;
-                        }
-
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
-                                        + typeTag);
-                            }
-                        }
-                        inputVal.reset();
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (count == 0 && aggType != ATypeTag.NULL) {
-                                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new LocalAvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
new file mode 100644
index 0000000..2c7abd4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index 9e411b6..c540c43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..6a1f60e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.LOCAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new LocalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..6a97410
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..6b31244
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MAX;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..4946703
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..1f47727
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, true);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
index 3e0b65c..0e35cd3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 8599913..0809701 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,164 +14,46 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
-import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class MinMaxAggregateFunction implements ICopyAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-    private DataOutput out;
-    private ICopyEvaluator eval;
-    private ATypeTag aggType;
-    private IBinaryComparator cmp;
-    private ITypePromoteComputer tpc;
-    private final boolean isMin;
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
     private final boolean isLocalAgg;
 
     public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
             boolean isLocalAgg) throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isMin = isMin;
+        super(args, provider, isMin);
         this.isLocalAgg = isLocalAgg;
     }
 
-    @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        outputVal.reset();
-        tempValForCasting.reset();
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
     }
 
     @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
-        if (aggType == ATypeTag.SYSTEM_NULL) {
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                // Ignore.
-                return;
-            }
-            // First value encountered. Set type, comparator, and initial value.
-            aggType = typeTag;
-            // Set comparator.
-            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(aggType, isMin);
-            cmp = cmpFactory.createBinaryComparator();
-            // Initialize min value.
-            outputVal.assign(inputVal);
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
         } else {
-
-            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
-            // by a global aggregator, it is simple ignored.
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                } else {
-                    return;
-                }
-            }
-
-            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
-                        .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    outputVal.reset();
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
-            } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
-                                tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
-            }
+            out.writeByte(ATypeTag.NULL.serialize());
         }
     }
 
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case NULL: {
-                    out.writeByte(ATypeTag.NULL.serialize());
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        out.writeByte(ATypeTag.NULL.serialize());
-                    }
-                    break;
-                }
-                default: {
-                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..cf3f3dd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
new file mode 100644
index 0000000..11cf13f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..b484b53
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * NULLs are also counted.
+ */
+public class SqlCountAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_COUNT;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlCountAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
new file mode 100644
index 0000000..5fb5d72
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * COUNT returns the number of non-null items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public class SqlCountAggregateFunction extends AbstractCountAggregateFunction {
+
+    public SqlCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..c8e415f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MAX;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..fe934f8
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MIN;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
new file mode 100644
index 0000000..56236e3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+    private final boolean isLocalAgg;
+
+    public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        super(args, provider, isMin);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            out.writeByte(ATypeTag.NULL.serialize());
+        }
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..294d9c1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_SUM;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, false);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
new file mode 100644
index 0000000..4418288
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
+    private final boolean isLocalAgg;
+    
+    public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        super(args, provider);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 31ad055..42b0346 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,202 +14,54 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SumAggregateFunction implements ICopyAggregateFunction {
-    private DataOutput out;
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private double sum;
-    private ATypeTag aggType;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-
+public class SumAggregateFunction extends AbstractSumAggregateFunction {
     private final boolean isLocalAgg;
 
     public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
             throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
+        super(args, provider);
         this.isLocalAgg = isLocalAgg;
     }
 
     @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        sum = 0.0;
+    protected boolean skipStep() { 
+        return (aggType == ATypeTag.NULL);
     }
 
     @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag
-                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
-        }
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
 
-        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-            aggType = typeTag;
-        }
-
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-                default:
-                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
-                            + aggType + "). ");
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
         }
     }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index ea75c77..5ae2459 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -65,22 +65,43 @@
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
@@ -426,6 +447,33 @@
         temp.add(ScalarMaxAggregateDescriptor.FACTORY);
         temp.add(ScalarMinAggregateDescriptor.FACTORY);
 
+        // SQL aggregates
+        temp.add(SqlCountAggregateDescriptor.FACTORY);
+        temp.add(SqlAvgAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SqlSumAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SqlMaxAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(SqlMinAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMinAggregateDescriptor.FACTORY);
+
+        // SQL serializable aggregates
+        temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlSumAggregateDescriptor.FACTORY);
+
+        // SQL scalar aggregates
+        temp.add(ScalarSqlCountAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlSumAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMinAggregateDescriptor.FACTORY);
+
         // new functions - constructors
         temp.add(ABooleanConstructorDescriptor.FACTORY);
         temp.add(ANullConstructorDescriptor.FACTORY);