Merge branch 'master' into ecarm002/aggregate_sql_functions

* master: (23 commits)
  Fixed a deadlock when stopping the dataset life cycle manager.
  remove debug logging
  move print methods to the 'debugging section'
  generate methods to print a record as JSON
  updated the documentation for the transaction model on behalf of youngseok
  add unrestricted REST endpoint
  remove useless gitignores that Eclipse generates
  modified snapshot version of fuzzy join, fixed issue 703
  adding fuzzyjoin code to git
  disable debugging checks fix LockManagerDeterministicUnitTest some cleanup
  some more cleanup (mostly removal of debug/logging code)
  fix typo
  some cleanup
  remove duplicate
  revert temporary changes
  better attempt to fix the bug minor cleanup
  a fix and a bunch of debugging changes
  fix construction with allocId
  add DeadlockTracker
  temp change to avoid deleting the instance after the test run
  ...
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 299bbfb..8f0bab3 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -304,6 +304,57 @@
     public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "local-avg-serial", 1);
 
+    // sql aggregate functions
+    public final static FunctionIdentifier SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-avg", 1);
+    public final static FunctionIdentifier SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-count", 1);
+    public final static FunctionIdentifier SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-sum", 1);
+    public final static FunctionIdentifier LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-sum", 1);
+    public final static FunctionIdentifier SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-max", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-max", 1);
+    public final static FunctionIdentifier SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-min", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-min", 1);
+    public final static FunctionIdentifier GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-global-sql-avg", 1);
+    public final static FunctionIdentifier LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-avg", 1);
+
+    public final static FunctionIdentifier SCALAR_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count", 1);
+    public final static FunctionIdentifier SCALAR_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-max", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-min", 1);
+    public final static FunctionIdentifier SCALAR_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg", 1);
+
+    // serializable sql aggregate functions
+    public final static FunctionIdentifier SERIAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg-serial", 1);
+
     public final static FunctionIdentifier SCAN_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "scan-collection", 1);
     public final static FunctionIdentifier SUBSET_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -739,6 +790,29 @@
         addFunction(RANGE, AInt32TypeComputer.INSTANCE, true);
         addFunction(RECTANGLE_CONSTRUCTOR, OptionalARectangleTypeComputer.INSTANCE, true);
 
+        // SQL Aggregate Functions
+        addFunction(SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addFunction(SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addFunction(SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SCALAR_GLOBAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SCALAR_LOCAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MAX, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MIN, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_SUM, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_GLOBAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+
         addFunction(SCALAR_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
         addFunction(SCALAR_COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(SCALAR_GLOBAL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
@@ -913,6 +987,16 @@
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MAX), getAsterixFunctionInfo(MAX));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MIN), getAsterixFunctionInfo(MIN));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SUM), getAsterixFunctionInfo(SUM));
+        // SQL Aggregate Functions
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_AVG), getAsterixFunctionInfo(SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_COUNT), getAsterixFunctionInfo(SQL_COUNT));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_GLOBAL_SQL_AVG),
+                getAsterixFunctionInfo(GLOBAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_LOCAL_SQL_AVG),
+                getAsterixFunctionInfo(LOCAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MAX), getAsterixFunctionInfo(SQL_MAX));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MIN), getAsterixFunctionInfo(SQL_MIN));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_SUM), getAsterixFunctionInfo(SQL_SUM));
     }
 
     static {
@@ -964,6 +1048,55 @@
         addAgg(SERIAL_LOCAL_SUM);
         addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
         addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
+
+        // SQL Aggregate Functions
+        addAgg(SQL_AVG);
+        addAgg(LOCAL_SQL_AVG);
+        addAgg(GLOBAL_SQL_AVG);
+        addLocalAgg(SQL_AVG, LOCAL_SQL_AVG);
+        addGlobalAgg(SQL_AVG, GLOBAL_SQL_AVG);
+
+        addAgg(SQL_COUNT);
+        addLocalAgg(SQL_COUNT, SQL_COUNT);
+        addGlobalAgg(SQL_COUNT, SQL_SUM);
+
+        addAgg(SQL_MAX);
+        addAgg(LOCAL_SQL_MAX);
+        addLocalAgg(SQL_MAX, LOCAL_SQL_MAX);
+        addGlobalAgg(SQL_MAX, SQL_MAX);
+
+        addAgg(SQL_MIN);
+        addLocalAgg(SQL_MIN, LOCAL_SQL_MIN);
+        addGlobalAgg(SQL_MIN, SQL_MIN);
+
+        addAgg(SQL_SUM);
+        addAgg(LOCAL_SQL_SUM);
+        addLocalAgg(SQL_SUM, LOCAL_SQL_SUM);
+        addGlobalAgg(SQL_SUM, SQL_SUM);
+
+        // SQL serializable aggregate functions
+        addSerialAgg(SQL_AVG, SERIAL_SQL_AVG);
+        addSerialAgg(SQL_COUNT, SERIAL_SQL_COUNT);
+        addSerialAgg(SQL_SUM, SERIAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addSerialAgg(GLOBAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_COUNT);
+        addLocalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_COUNT);
+        addGlobalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_SUM);
+
+        addAgg(SERIAL_SQL_AVG);
+        addAgg(SERIAL_LOCAL_SQL_AVG);
+        addAgg(SERIAL_GLOBAL_SQL_AVG);
+        addLocalAgg(SERIAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addGlobalAgg(SERIAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_SUM);
+        addAgg(SERIAL_LOCAL_SQL_SUM);
+        addLocalAgg(SERIAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addGlobalAgg(SERIAL_SQL_SUM, SERIAL_SQL_SUM);
+
     }
 
     static {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..241ee46
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlAvgAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_AVG;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..1b581e4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlCountAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_COUNT;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..f58d578
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMaxAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MAX;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..da6e38d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMinAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..14aa6b7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlSumAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..4d804ee
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
+    private static final int SUM_OFFSET = 0;
+    private static final int COUNT_OFFSET = 8;
+    protected static final int AGG_TYPE_OFFSET = 16;
+
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    public AbstractSerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeDouble(0.0);
+            state.writeLong(0);
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public abstract void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException;
+
+    public abstract void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    public abstract void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+    protected abstract void processNull(byte[] state, int start);
+
+    protected void processDataValues(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+        ++count;
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+        BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+    }
+
+    protected void finishPartialResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        if (recordEval == null) {
+            ARecordType recType;
+            try {
+                recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                        BuiltinType.AINT64 }, false);
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
+            }
+            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+                    avgBytes, result);
+        }
+
+        try {
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                result.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+                countBytes.reset();
+                aInt64.setValue(count);
+                longSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull(state, start);
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+                BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+                BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+                state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
+    }
+
+    protected void finishFinalResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL)
+                nullSerde.serialize(ANull.NULL, result);
+            else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, result);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
new file mode 100644
index 0000000..71f709f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+    private static final int MET_NULL_OFFSET = 0;
+    private static final int COUNT_OFFSET = 1;
+
+    private AMutableInt64 result = new AMutableInt64(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+
+    public AbstractSerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeBoolean(false);
+            state.writeLong(0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+        } else {
+            cnt++;
+        }
+        BufferSerDeUtil.writeBoolean(metNull, state, start + MET_NULL_OFFSET);
+        BufferSerDeUtil.writeLong(cnt, state, start + COUNT_OFFSET);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+        try {
+            if (metNull) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                result.setValue(cnt);
+                int64Serde.serialize(result, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        BufferSerDeUtil.writeBoolean(true, state, start + MET_NULL_OFFSET);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
new file mode 100644
index 0000000..68c5b43
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+    protected static final int AGG_TYPE_OFFSET = 0;
+    private static final int SUM_OFFSET = 1;
+
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    public ISerializerDeserializer serde;
+
+    public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            state.writeDouble(0.0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        if (skipStep(state, start)) {
+            return;
+        }
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull(state, start);
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+        }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                aggType = typeTag;
+                break;
+            }
+            case SYSTEM_NULL: {
+                processSystemNull();
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+        BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull(out);
+                    break;
+                }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+    protected abstract void processNull(byte[] state, int start);
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull(DataOutput out) throws IOException;
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
index 72e0045..a215a07 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -62,129 +40,15 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
+            @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        try {
-                            partialResult.write(data, start, len);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                };
+                return new SerializableAvgAggregateFunction(args);
             }
         };
     }
+
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..202933d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index a9b264c..0c9ce06 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * count(NULL) returns NULL.
@@ -63,68 +49,7 @@
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private AMutableInt64 result = new AMutableInt64(-1);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeLong(0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL) {
-                            metNull = true;
-                        } else {
-                            cnt++;
-                        }
-                        BufferSerDeUtil.writeBoolean(metNull, state, start);
-                        BufferSerDeUtil.writeLong(cnt, state, start + 1);
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
-                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
-                        try {
-                            if (metNull) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                result.setValue(cnt);
-                                int64Serde.serialize(result, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableCountAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
new file mode 100644
index 0000000..7a6aacd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+    public SerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index f720434..ccfeb43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableGlobalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -70,154 +41,13 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType _recType;
-        try {
-            _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = _recType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeBoolean(false);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                        BufferSerDeUtil.writeDouble(globalSum, state, start);
-                        BufferSerDeUtil.writeLong(globalCount, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, result);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, result);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput result)
-                            throws AlgebricksException {
-                        double globalSum = BufferSerDeUtil.getDouble(state, start);
-                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-
-                        try {
-                            if (globalCount == 0 || metNull) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new SerializableGlobalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..c6d4b5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..7fa6b9c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableGlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_GLOBAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableGlobalSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..cc5043f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 219204b..c015682 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -74,163 +41,12 @@
 
     @Override
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
-            ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        final ICopyEvaluatorFactory[] evals = args;
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-                return new ICopySerializableAggregateFunction() {
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-                    private ClosedRecordConstructorEval recordEval;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeDouble(0.0);
-                            state.writeLong(0);
-                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        }
-                        ++count;
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                        BufferSerDeUtil.writeDouble(sum, state, start);
-                        BufferSerDeUtil.writeLong(count, state, start + 8);
-                        state[start + 16] = aggType.serialize();
-                    }
-
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
-                        double sum = BufferSerDeUtil.getDouble(state, start);
-                        long count = BufferSerDeUtil.getLong(state, start + 8);
-                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
-                        if (recordEval == null) {
-                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
-                                    evalCount }, avgBytes, result);
-                        }
-                        try {
-                            if (count == 0) {
-                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            int64Serde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
-                            throws AlgebricksException {
-                        finish(state, start, len, partialResult);
-                    }
-
-                };
+                return new SerializableLocalAvgAggregateFunction(args);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
new file mode 100644
index 0000000..00835d7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+    }
+
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..58415d3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableLocalSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0caeadf
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processPartialResults(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishPartialResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..2ce85c6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 3f3aaff..bd6ead8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
 public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSumAggregateDescriptor();
@@ -36,7 +35,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return FID;
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..786510b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlAvgAggregateFunction(args);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..c1f77a3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+    public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        processDataValues(tuple, state, start, len);
+    }
+
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finishFinalResults(state, start, len, result);
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+        finish(state, start, len, result);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..5bfd6f2
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_COUNT;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlCountAggregateFunction(args);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
new file mode 100644
index 0000000..525b643
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+    public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        super(args);
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..c68b3b3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_SUM;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSqlSumAggregateFunction(args, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
new file mode 100644
index 0000000..f80ade3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+    private final boolean isLocalAgg;
+
+    public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+            throws AlgebricksException {
+        super(args);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull(byte[] state, int start) {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index bea6ab8..15e546a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -17,196 +17,54 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
+public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
     private final boolean isLocalAgg;
 
     public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
             throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
+        super(args);
         this.isLocalAgg = isLocalAgg;
     }
 
-    @Override
-    public void init(DataOutput state) throws AlgebricksException {
-        try {
-            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-            state.writeDouble(0.0);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
+    protected void processNull(byte[] state, int start) {
+        ATypeTag aggType = ATypeTag.NULL;
+        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
     }
 
     @Override
-    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                aggType = typeTag;
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
-        }
-        state[start] = aggType.serialize();
-        BufferSerDeUtil.writeDouble(sum, state, start + 1);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
-        double sum = BufferSerDeUtil.getDouble(state, start + 1);
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
         }
-
     }
 
-    @Override
-    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-        finish(state, start, len, out);
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
new file mode 100644
index 0000000..cfc5f1c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+    private static final int SUM_FIELD_ID = 0;
+    private static final int COUNT_FIELD_ID = 1;
+
+    private final ARecordType recType;
+
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    protected ATypeTag aggType;
+    private double sum;
+    private long count;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+                    BuiltinType.AINT64 }, false);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        recType = tmpRecType;
+        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+                out);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+        count = 0;
+    }
+
+    public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+    public abstract void finish() throws AlgebricksException;
+
+    public abstract void finishPartial() throws AlgebricksException;
+
+    protected abstract void processNull();
+
+    protected void processDataValues(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+        ++count;
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+    }
+
+    protected void finishPartialResults() throws AlgebricksException {
+        try {
+            // Double check that count 0 is accounted
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            } else if (aggType == ATypeTag.NULL) {
+                out.writeByte(ATypeTag.NULL.serialize());
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+                countBytes.reset();
+                aInt64.setValue(count);
+                longSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected void processPartialResults(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                processNull();
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                break;
+            }
+            case RECORD: {
+                // Expected.
+                int nullBitmapSize = 0;
+                int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+                        false);
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+                        nullBitmapSize, false);
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
+    }
+
+    protected void finishFinalResults() throws AlgebricksException {
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
new file mode 100644
index 0000000..29e57f5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+    private AMutableInt64 result = new AMutableInt64(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    protected long cnt;
+    private DataOutput out;
+
+    public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+    }
+
+    @Override
+    public void init() {
+        cnt = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        // Ignore SYSTEM_NULL.
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+        } else if (typeTag != ATypeTag.SYSTEM_NULL) {
+            cnt++;
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            result.setValue(cnt);
+            int64Serde.serialize(result, out);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected abstract void processNull();
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
new file mode 100644
index 0000000..29aba33
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
+    protected DataOutput out;
+    private ICopyEvaluator eval;
+    protected ATypeTag aggType;
+    private IBinaryComparator cmp;
+    private ITypePromoteComputer tpc;
+    private final boolean isMin;
+
+    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isMin = isMin;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        outputVal.reset();
+        tempValForCasting.reset();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.NULL) {
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                // Ignore.
+                return;
+            }
+            // First value encountered. Set type, comparator, and initial value.
+            aggType = typeTag;
+            // Set comparator.
+            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(aggType, isMin);
+            cmp = cmpFactory.createBinaryComparator();
+            // Initialize min value.
+            outputVal.assign(inputVal);
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else {
+
+            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
+            // by a global aggregator, it is simple ignored.
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                processSystemNull();
+                return;
+            }
+
+            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
+                aggType = typeTag;
+                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+                        .createBinaryComparator();
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
+                                outputVal.getLength() - 1, tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    outputVal.reset();
+                    outputVal.assign(tempValForCasting);
+                }
+                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                    outputVal.assign(inputVal);
+                }
+
+            } else {
+                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
+                                tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
+                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
+                            outputVal.getLength()) < 0) {
+                        outputVal.assign(tempValForCasting);
+                    }
+                } else {
+                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                        outputVal.assign(inputVal);
+                    }
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case NULL: {
+                    out.writeByte(ATypeTag.NULL.serialize());
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull();
+                    break;
+                }
+                default: {
+                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+
+    protected abstract void processNull();
+
+    protected abstract void processSystemNull() throws AlgebricksException;
+
+    protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
new file mode 100644
index 0000000..1332dd5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
+    protected DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    protected ATypeTag aggType;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    protected ISerializerDeserializer serde;
+
+    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        if (skipStep()) {
+            return;
+        }
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+        }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            case SYSTEM_NULL: {
+                processSystemNull();
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    finishSystemNull();
+                    break;
+                }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected boolean skipStep() {
+        return false;
+    }
+    protected abstract void processNull();
+    protected abstract void processSystemNull() throws AlgebricksException;
+    protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 7815606..3ff836a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -15,51 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -78,165 +43,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private double sum;
-                    private long count;
-                    private ATypeTag aggType;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-
-                    @Override
-                    public void init() throws AlgebricksException {
-                        aggType = ATypeTag.SYSTEM_NULL;
-                        sum = 0.0;
-                        count = 0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                            aggType = typeTag;
-                        }
-
-                        if (typeTag != ATypeTag.SYSTEM_NULL) {
-                            ++count;
-                        }
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (count == 0 || aggType == ATypeTag.NULL) {
-                                nullSerde.serialize(ANull.NULL, out);
-                            } else {
-                                aDouble.setValue(sum / count);
-                                doubleSerde.serialize(aDouble, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
-                            }
-                        } else {
-                            try {
-                                if (aggType == ATypeTag.NULL) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(count);
-                                intSerde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
-                        }
-                    }
-                };
+                return new AvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
new file mode 100644
index 0000000..2470e5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index c63ab62..7c97fc2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -14,70 +14,21 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
  */
-public class CountAggregateFunction implements ICopyAggregateFunction {
-    private AMutableInt64 result = new AMutableInt64(-1);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private long cnt;
-    private DataOutput out;
+public class CountAggregateFunction extends AbstractCountAggregateFunction {
 
     public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        out = output.getDataOutput();
+        super(args, output);
     }
 
-    @Override
-    public void init() {
-        cnt = 0;
+    protected void processNull() {
+        cnt++;
     }
 
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        // Ignore SYSTEM_NULL.
-        if (typeTag != ATypeTag.SYSTEM_NULL) {
-            cnt++;
-        }
-    }
-
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            result.setValue(cnt);
-            int64Serde.serialize(result, out);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index eb5d6bd..230e06b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -15,45 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class GlobalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -73,142 +44,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-        List<IAType> unionList = new ArrayList<IAType>();
-        unionList.add(BuiltinType.ANULL);
-        unionList.add(BuiltinType.ADOUBLE);
-        ARecordType tmpRecType;
-        try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
-        } catch (AsterixException e) {
-            throw new AlgebricksException(e);
-        }
-
-        final ARecordType recType = tmpRecType;
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private double globalSum;
-                    private long globalCount;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private boolean metNull;
-
-                    @Override
-                    public void init() {
-                        globalSum = 0.0;
-                        globalCount = 0;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        byte[] serBytes = inputVal.getByteArray();
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-                        switch (typeTag) {
-                            case NULL: {
-                                metNull = true;
-                                break;
-                            }
-                            case SYSTEM_NULL: {
-                                // Ignore and return.
-                                return;
-                            }
-                            case RECORD: {
-                                // Expected.
-                                break;
-                            }
-                            default: {
-                                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                            }
-                        }
-
-                        // The record length helps us determine whether the input record fields are nullable.
-                        int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
-                        int nullBitmapSize = 1;
-                        if (recordLength == 29) {
-                            nullBitmapSize = 0;
-                        }
-                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize,
-                                false);
-                        if (offset1 == 0) // the sum is null
-                            metNull = true;
-                        else
-                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize,
-                                false);
-                        if (offset2 != 0) // the count is not null
-                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (globalCount == 0 || metNull)
-                                nullSerde.serialize(ANull.NULL, out);
-                            else {
-                                aDouble.setValue(globalSum / globalCount);
-                                doubleSerde.serialize(aDouble, out);
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        try {
-                            if (metNull || globalCount == 0) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(globalSum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(globalCount);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-                };
+                return new GlobalAvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..a63d4bc
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0808e81
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.GLOBAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new GlobalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..02c3c2c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processPartialResults(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index 09a659c..fd98764 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -15,50 +15,16 @@
 
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LocalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -77,157 +43,13 @@
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
-
         return new ICopyAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                List<IAType> unionList = new ArrayList<IAType>();
-                unionList.add(BuiltinType.ANULL);
-                unionList.add(BuiltinType.ADOUBLE);
-                ARecordType tmpRecType;
-                try {
-                    tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                            new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
-                } catch (AsterixException e) {
-                    throw new AlgebricksException(e);
-                }
-
-                final ARecordType recType = tmpRecType;
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private ATypeTag aggType;
-                    private double sum;
-                    private long count;
-
-                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
-                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
-                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
-                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
-                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
-                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-                    private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
-                            new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT64);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-
-                    @Override
-                    public void init() {
-                        aggType = ATypeTag.SYSTEM_NULL;
-                        sum = 0.0;
-                        count = 0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                .deserialize(inputVal.getByteArray()[0]);
-                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-                            aggType = ATypeTag.NULL;
-                            return;
-                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-                            aggType = typeTag;
-                        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-                            throw new AlgebricksException("Unexpected type " + typeTag
-                                    + " in aggregation input stream. Expected type " + aggType + ".");
-                        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                            aggType = typeTag;
-                        }
-
-                        if (typeTag != ATypeTag.SYSTEM_NULL) {
-                            ++count;
-                        }
-
-                        switch (typeTag) {
-                            case INT8: {
-                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT16: {
-                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT32: {
-                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case INT64: {
-                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case FLOAT: {
-                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case DOUBLE: {
-                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                sum += val;
-                                break;
-                            }
-                            case NULL: {
-                                break;
-                            }
-                            default: {
-                                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
-                                        + typeTag);
-                            }
-                        }
-                        inputVal.reset();
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (count == 0 && aggType != ATypeTag.NULL) {
-                                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                                return;
-                            }
-                            if (aggType == ATypeTag.NULL) {
-                                sumBytes.reset();
-                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                            } else {
-                                sumBytes.reset();
-                                aDouble.setValue(sum);
-                                doubleSerde.serialize(aDouble, sumBytesOutput);
-                            }
-                            countBytes.reset();
-                            aInt64.setValue(count);
-                            longSerde.serialize(aInt64, countBytesOutput);
-                            recordEval.evaluate(null);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new LocalAvgAggregateFunction(args, provider);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
new file mode 100644
index 0000000..2c7abd4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+
+    @Override
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index 9e411b6..c540c43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..6a1f60e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.LOCAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new LocalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..6a97410
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishPartialResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..6b31244
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MAX;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..4946703
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..1f47727
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, true);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
index 3e0b65c..0e35cd3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 8599913..0809701 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,164 +14,46 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
-import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class MinMaxAggregateFunction implements ICopyAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-    private DataOutput out;
-    private ICopyEvaluator eval;
-    private ATypeTag aggType;
-    private IBinaryComparator cmp;
-    private ITypePromoteComputer tpc;
-    private final boolean isMin;
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
     private final boolean isLocalAgg;
 
     public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
             boolean isLocalAgg) throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isMin = isMin;
+        super(args, provider, isMin);
         this.isLocalAgg = isLocalAgg;
     }
 
-    @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        outputVal.reset();
-        tempValForCasting.reset();
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
     }
 
     @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
+    protected boolean skipStep() {
+        return (aggType == ATypeTag.NULL);
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
-        if (aggType == ATypeTag.SYSTEM_NULL) {
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                // Ignore.
-                return;
-            }
-            // First value encountered. Set type, comparator, and initial value.
-            aggType = typeTag;
-            // Set comparator.
-            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(aggType, isMin);
-            cmp = cmpFactory.createBinaryComparator();
-            // Initialize min value.
-            outputVal.assign(inputVal);
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
         } else {
-
-            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
-            // by a global aggregator, it is simple ignored.
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                } else {
-                    return;
-                }
-            }
-
-            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
-                        .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    outputVal.reset();
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
-            } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
-                                tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
-            }
+            out.writeByte(ATypeTag.NULL.serialize());
         }
     }
 
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case NULL: {
-                    out.writeByte(ATypeTag.NULL.serialize());
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        out.writeByte(ATypeTag.NULL.serialize());
-                    }
-                    break;
-                }
-                default: {
-                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..cf3f3dd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
new file mode 100644
index 0000000..11cf13f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        processDataValues(tuple);
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        finishFinalResults();
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..b484b53
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * NULLs are also counted.
+ */
+public class SqlCountAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_COUNT;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlCountAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
new file mode 100644
index 0000000..5fb5d72
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * COUNT returns the number of non-null items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public class SqlCountAggregateFunction extends AbstractCountAggregateFunction {
+
+    public SqlCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..c8e415f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MAX;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..fe934f8
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MIN;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
new file mode 100644
index 0000000..56236e3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+    private final boolean isLocalAgg;
+
+    public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        super(args, provider, isMin);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            out.writeByte(ATypeTag.NULL.serialize());
+        }
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..294d9c1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_SUM;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, false);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
new file mode 100644
index 0000000..4418288
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
+    private final boolean isLocalAgg;
+    
+    public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        super(args, provider);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    protected void processNull() {
+    }
+
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
+        }
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 31ad055..42b0346 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,202 +14,54 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SumAggregateFunction implements ICopyAggregateFunction {
-    private DataOutput out;
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private double sum;
-    private ATypeTag aggType;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-
+public class SumAggregateFunction extends AbstractSumAggregateFunction {
     private final boolean isLocalAgg;
 
     public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
             throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
+        super(args, provider);
         this.isLocalAgg = isLocalAgg;
     }
 
     @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        sum = 0.0;
+    protected boolean skipStep() { 
+        return (aggType == ATypeTag.NULL);
     }
 
     @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag
-                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
-        }
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
 
-        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-            aggType = typeTag;
-        }
-
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
+    @Override
+    protected void processSystemNull() throws AlgebricksException {
+        // For global aggregates simply ignore system null here,
+        // but if all input value are system null, then we should return
+        // null in finish().
+        if (isLocalAgg) {
+            throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
         }
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-                default:
-                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
-                            + aggType + "). ");
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
+    protected void finishSystemNull() throws IOException {
+        // Empty stream. For local agg return system null. For global agg return null.
+        if (isLocalAgg) {
+            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+        } else {
+            serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+            serde.serialize(ANull.NULL, out);
         }
     }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
-    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index ea75c77..5ae2459 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -65,22 +65,43 @@
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
@@ -426,6 +447,33 @@
         temp.add(ScalarMaxAggregateDescriptor.FACTORY);
         temp.add(ScalarMinAggregateDescriptor.FACTORY);
 
+        // SQL aggregates
+        temp.add(SqlCountAggregateDescriptor.FACTORY);
+        temp.add(SqlAvgAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SqlSumAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SqlMaxAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(SqlMinAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMinAggregateDescriptor.FACTORY);
+
+        // SQL serializable aggregates
+        temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlSumAggregateDescriptor.FACTORY);
+
+        // SQL scalar aggregates
+        temp.add(ScalarSqlCountAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlSumAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMinAggregateDescriptor.FACTORY);
+
         // new functions - constructors
         temp.add(ABooleanConstructorDescriptor.FACTORY);
         temp.add(ANullConstructorDescriptor.FACTORY);